Get Data Factory to Check Itself for a Running Pipeline via the Azure Management API

Hi friends, just a very quick how to guide style post on something I had to build in Azure Data Factory.


Scenario: I want to trigger a Data Factory pipeline, but when I do I want the pipeline to know if it’s already running. If it is already running, stop the new run.

Sounds simple enough right?

Wrong!

But, now simple for you, because I’ve done it for you, yay! 🙂

Solution:

This is intended to become a utilities pipeline that is wrapped up by an Execute Pipeline Activity. Once done it will be provided with the following pipeline parameters:

  • PipelineName
    • (the pipeline that called it, so using the system variable: @pipeline().Pipeline)
  • ThisRunId
    • (from the pipeline that called it, so using the system variable: @pipeline().RunId)

Going through these activities in order, left to right from the image above.

Get Subscription

Assuming nothing I used a Web Activity to firstly hit the Azure Management API to return the subscription ID for the Data Factory I’m working in.

https://management.azure.com/subscriptions?api-version=2020-01-01

Nothing special here in terms of a URL or request and I’m using Data Factory’s own MSI to authenticate.

Set Variable

For debugging ease I used a variable and parsed the output of the first Web Activity result to give me the clean Subscription ID value. Also I was expecting to need to deal with a long list of subscriptions. As it turned out Value[0] was good enough.

@replace(activity('Get Subscription').output.value[0].id,'/subscriptions/','')

Get Resource Group Name

Next, I slightly cheated here by simply storing the Resource Group Name in a SQL database table. I’m sure it can be returned via the management API, but I gave up looking around the MS Docs site and went for the easy option with a Lookup Activity.

Everything I found needed the Resource Group Name provided, you couldn’t reverse engineer it from the Resource, that I could see. Happy to be told otherwise.

Check For Valid Pipeline Name

Armed with the following values:

  • Pipeline Name – from the parameter.
  • Subscription Id – from the variable.
  • Resource Group Name – from the database table lookup.
  • Data Factory name – from its own system variable.

I decided to be a good developer I do an initial defensive check to ensure the pipeline name provided was in fact valid for this Data Factory. This includes needing the pipeline to be deployed, not just running in Debug mode.

https://management.azure.com/subscriptions/@{variables('SubscriptionId')}/resourceGroups/@{activity('Get Resource Group').output.firstRow.PropertyValue}/providers/Microsoft.DataFactory/factories/@{pipeline().DataFactory}/pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01

If not valid, the exception returned isn’t very informative, but its better than assuming its not running at all on the pipeline succeeding.

Get Query Run Days Value

As we are running a query against Data Factory for any pipeline runs by name, some extra filtering will be required. Basically providing a possible start and end date/time value for the pipeline run information we return. This uses the management API RunQueryFilter options than get passed in the request body.

https://docs.microsoft.com/en-us/rest/api/datafactory/pipelineruns/querybyfactory

To give me the flexibility to change this in the future I decided to once again get this value from a SQL database table. In my case returning ‘-10’, to be used below to define the start window date/time.

Get Pipeline Runs

Now the fun part.

I hit the Azure Management API again with a Web Activity. Performing the following POST request to query the pipeline runs:

URL https://management.azure.com/

subscriptions/@{variables('SubscriptionId')}/

resourceGroups/@{activity('Get Resource Group').output.firstRow.PropertyValue}/

providers/Microsoft.DataFactory/

factories/@{pipeline().DataFactory}/

pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01

Body {
"lastUpdatedAfter": "@{adddays(utcnow(),int(activity('Get Query Run Days Value').output.firstRow.PropertyValue))}",
"lastUpdatedBefore": "@{utcnow()}",
"filters": [
{
"operand": "PipelineName",
"operator": "Equals",
"values": [
"@{pipeline().parameters.PipelineName}"
]
}
]
}

To my surprise this request failed due to an authentication issue.

Annoyingly the Data Factory’s own MSI does not have access to query its own Pipeline Runs via the Management API. You have to explicitly grant it access to itself! Madness.

Anyway, a few grumpy tweets later, moving on 🙂

Filter Running Pipelines

Now to filter the results to establish if the target pipeline is running.

Using the Filter Activity is a nice way to handle this, firstly providing the array of results:

@activity('Get Pipeline Runs').output.value

Then using the condition expression, this had to exclude its own Run ID (provided as a pipeline parameter), then consider a status of either In Progress or Queued as ultimatley ‘Running’. Express below:

@and(not(equals(item().runId,pipeline().parameters.ThisRunId)),or(equals(item().status,'InProgress'),equals(item().status,'Queued')))

If Pipeline Is Running

Finally, an If Activity. In my case, if the result of the filtering was still a value greater than or equal to one. Meaning the pipeline is already running.

@greaterOrEquals(int(activity('Filter Running Pipelines').output.FilteredItemsCount),1)

Within the If True outcome I used another Lookup Activity to throw the exception. Data Factory doesn’t currently have a means of throwing its own exceptions so I used my SQL database a third time to handle this with a T-SQL RAISERROR statement, shown below.

RAISERROR('@{concat('Provided pipeline name (',pipeline().parameters.PipelineName,') still has a run in progress or queued given the query range parameters set in the properties table.')}',16,1);

Outcome:

The triggered parent pipeline hits this utility pipeline to check if its already running, if it is an exception is thrown by the utility pipeline – stopping the parent.


Code in Blog Supporting Content GitHub repository. https://github.com/mrpaulandrew/BlogSupportingContent


Hopefully someone else will find this helpful.

Many thanks for reading.

31 thoughts on “Get Data Factory to Check Itself for a Running Pipeline via the Azure Management API

  1. Hey paul can you let how you fixed the API issue
    Annoyingly the Data Factory’s own MSI does not have access to query its own Pipeline Runs via the Management API. You have to explicitly grant it access to itself! Madness.

    Like

      1. How did you do to do that? I think I’ve done it, but not sure as I’m getting “Authentication failed. The ‘Authorization’ header is missing”
        Did you have to provide any auth headers for the web call?

        Like

      2. Did you do that in the portal? It would be helpful for those who are new to this. Did you have to add auth headers? I think I’ve given the contributor role to the MI but I’m getting “Authentication failed. The ‘Authorization’ header is missing.”

        Like

  2. Hi Paul, in GET PIPELINE RUNS acitivty, I’m getting error {“message”:”The requested resource does not support http method ‘POST’.”}. What is the resource url we have to mention in MSI authentication?

    Like

    1. I got the same error. @mrpaulandrew‘s “GET PIPELINE RUNS” section has an URL which, if you checked the JSON source, actually ends with “….pipeline().DataFactory}/queryPipelineRuns?api-version=2018-06-01”. He must’ve copy pasted the bit from “CHECK FOR VALID PIPELINE NAME” by mistake 😀

      Like

  3. Hi Paul,
    I was wondering why you would want to this. Would it not be easier to put the concurrency option to 1 in the pipeline settings? Or do I miss something?

    Like

    1. Hey, yes, that would also work. I did this post as a precursor blog/test for my processing framework (procfwk.com) where I need a combination of pipeline name and parameter that have been past to it to establish concurrency. Thanks

      Like

    2. Correct me if i’m wrong but the concurrency behaviour doesn’t work if the same pipeline is run from a different trigger. Say I have a large ETL run and a small ETL run on a high frequency, if they are on separate triggers they run the risk of overlapping. This way is more flexible. Your processing framework looks great Paul.

      Like

  4. Hi Paul ,
    Is there any means we can return pipeline status to the ADF Rest API ?
    I’m trying to invoke ADF using REST API , it return Runid , but i want to know the status of pipeline , whether its success or failure.
    Is it possible to return status of Pipeline from ADF Call?

    Like

    1. Not that I tried.
      I just use a layer of Azure Functions as the middle ware to execute pipelines, get status and return error messages.
      Check out the functions app in my procfwk.com

      Like

      1. Pipeline status is absolutely available to return. You can find it in the data factory api documentation

        Like

  5. Hi Paul , I am trying this approach in Azure Synapse analytics pipelines . I tried providing multiple roles to my synapse workspace but none of it is working in case of web activity. Every time its giving me the error: The client does not have authorization to perform action.
    Can you please specify which particular role you provided to the ADF and can we implement the same in synapse or not.

    Like

  6. Hi Paul
    I am trying to implement this in Azure Synapse analytics . I am getting an error :
    Invoking Web Activity failed with HttpStatusCode – ‘NotFound’, message – ‘The requested resource does not exist on the server. Please verify the request server and retry

    It seems we can’t get details of pipeline runs using synapse. Please let me know a way if you know it’s possible

    Like

  7. Hi,
    I have the same error ‘HttpStatusCode – ‘NotFound’, message – ‘The requested resource does not exist on the server. Please verify the request server and retry’ by using ADF.

    Maybe something has changed and now it’s not possible these logs.

    Like

  8. I’ve found the solution to the below issue :
    ‘HttpStatusCode – ‘NotFound’, message – ‘The requested resource does not exist on the server. Please verify the request server and retry’

    Pipeline should be triggered instead of Debug.

    Like

  9. Hi,
    Thank you for this article. I would like to query all historical pipeline runs and have an issue that API call return only 100 pipeline runs even if there are more. Did you have this issue and know how to change this limitation?

    Like

  10. Hi,
    i would like to check whether this pipeline has already run once with exactly the same call parameters.
    How can i implement such a check?

    Does anyone have an example?

    Like

  11. Hi,
    I am getting this error “Invoking Web Activity failed with HttpStatusCode – ‘405 : MethodNotAllowed’, message – ” ”
    Any idean?

    Like

  12. Hi Paul, I am working on Synapse, what would be equivalent URL for synapse? IN the following it is using dataFactory. Not sure if this works for synapse? please advice.
    https://management.azure.com/
    subscriptions/@{variables(‘SubscriptionId’)}/

    resourceGroups/@{activity(‘Get Resource Group’).output.firstRow.PropertyValue}/

    providers/Microsoft.DataFactory/

    factories/@{pipeline().DataFactory}/

    pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01

    Like

  13. I am using databases for several DataFactories. When running an ADF the used database is scaled up at the first pipeline and scaled down at the last pipeline.
    For debugging purposes it is needed to manualy upscale a database.
    In order to prevent excessive costs the database should me downscaled manualy again wich can be forgotten.

    I want to use your solution to scan for unrunning pipelines and downscale the corresponding databases. Of course, when a pipeline is running, the database may not me scaled down.

    I am getting an error at getting the pipeline name (Check for Valid Pipeline Name).
    “does not have authorization to perform action ‘Microsoft.DataFactory/factories/pipelines/read”

    I am already owner of the subscription and therefor owner of all child resources.
    Which role should I add to solve this error?

    Like

  14. Hi Niek,

    I have access to MSI but still facing below issue while run the web activity.

    ADF : Invoking endpoint failed with HttpStatusCode – ‘405 : MethodNotAllowed’, message – ‘Endpoint(url) knows the request method, but the target endpoint(url) doesn’t support this method. Please verify, if the endpoint supports the method you have selected in Data Factory.’

    Like

  15. Hi Paul,

    I am facing below issue while running the web-activity and I have MSI access.

    ADF : Invoking endpoint failed with HttpStatusCode – ‘405 : MethodNotAllowed’, message – ‘Endpoint(url) knows the request method, but the target endpoint(url) doesn’t support this method. Please verify, if the endpoint supports the method you have selected in Data Factory.’

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.