Hi friends, just a very quick how to guide style post on something I had to build in Azure Data Factory.
Scenario: I want to trigger a Data Factory pipeline, but when I do I want the pipeline to know if it’s already running. If it is already running, stop the new run.
Sounds simple enough right?
Wrong!
But, now simple for you, because I’ve done it for you, yay! 🙂
Solution:
This is intended to become a utilities pipeline that is wrapped up by an Execute Pipeline Activity. Once done it will be provided with the following pipeline parameters:
- PipelineName
- (the pipeline that called it, so using the system variable: @pipeline().Pipeline)
- ThisRunId
- (from the pipeline that called it, so using the system variable: @pipeline().RunId)
Going through these activities in order, left to right from the image above.
Get Subscription
Assuming nothing I used a Web Activity to firstly hit the Azure Management API to return the subscription ID for the Data Factory I’m working in.
https://management.azure.com/subscriptions?api-version=2020-01-01
Nothing special here in terms of a URL or request and I’m using Data Factory’s own MSI to authenticate.
Set Variable
For debugging ease I used a variable and parsed the output of the first Web Activity result to give me the clean Subscription ID value. Also I was expecting to need to deal with a long list of subscriptions. As it turned out Value[0] was good enough.
@replace(activity('Get Subscription').output.value[0].id,'/subscriptions/','')
Get Resource Group Name
Next, I slightly cheated here by simply storing the Resource Group Name in a SQL database table. I’m sure it can be returned via the management API, but I gave up looking around the MS Docs site and went for the easy option with a Lookup Activity.
Everything I found needed the Resource Group Name provided, you couldn’t reverse engineer it from the Resource, that I could see. Happy to be told otherwise.
Check For Valid Pipeline Name
Armed with the following values:
- Pipeline Name – from the parameter.
- Subscription Id – from the variable.
- Resource Group Name – from the database table lookup.
- Data Factory name – from its own system variable.
I decided to be a good developer I do an initial defensive check to ensure the pipeline name provided was in fact valid for this Data Factory. This includes needing the pipeline to be deployed, not just running in Debug mode.
https://management.azure.com/subscriptions/@{variables('SubscriptionId')}/resourceGroups/@{activity('Get Resource Group').output.firstRow.PropertyValue}/providers/Microsoft.DataFactory/factories/@{pipeline().DataFactory}/pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01
If not valid, the exception returned isn’t very informative, but its better than assuming its not running at all on the pipeline succeeding.
Get Query Run Days Value
As we are running a query against Data Factory for any pipeline runs by name, some extra filtering will be required. Basically providing a possible start and end date/time value for the pipeline run information we return. This uses the management API RunQueryFilter options than get passed in the request body.
https://docs.microsoft.com/en-us/rest/api/datafactory/pipelineruns/querybyfactory
To give me the flexibility to change this in the future I decided to once again get this value from a SQL database table. In my case returning ‘-10’, to be used below to define the start window date/time.
Get Pipeline Runs
Now the fun part.
I hit the Azure Management API again with a Web Activity. Performing the following POST request to query the pipeline runs:
URL | https://management.azure.com/
|
Body | { "lastUpdatedAfter": " @{adddays(utcnow(),int(activity('Get Query Run Days Value').output.firstRow.PropertyValue))} ", "lastUpdatedBefore": "@{utcnow()}", "filters": [ { "operand": "PipelineName", "operator": "Equals", "values": [ "@{pipeline().parameters.PipelineName}" ] } ] } |
To my surprise this request failed due to an authentication issue.
Annoyingly the Data Factory’s own MSI does not have access to query its own Pipeline Runs via the Management API. You have to explicitly grant it access to itself! Madness.
Anyway, a few grumpy tweets later, moving on 🙂
Filter Running Pipelines
Now to filter the results to establish if the target pipeline is running.
Using the Filter Activity is a nice way to handle this, firstly providing the array of results:
@activity('Get Pipeline Runs').output.value
Then using the condition expression, this had to exclude its own Run ID (provided as a pipeline parameter), then consider a status of either In Progress or Queued as ultimatley ‘Running’. Express below:
@and(not(equals(item().runId,pipeline().parameters.ThisRunId)),or(equals(item().status,'InProgress'),equals(item().status,'Queued')))
If Pipeline Is Running
Finally, an If Activity. In my case, if the result of the filtering was still a value greater than or equal to one. Meaning the pipeline is already running.
@greaterOrEquals(int(activity('Filter Running Pipelines').output.FilteredItemsCount),1)
Within the If True outcome I used another Lookup Activity to throw the exception. Data Factory doesn’t currently have a means of throwing its own exceptions so I used my SQL database a third time to handle this with a T-SQL RAISERROR statement, shown below.
RAISERROR('@{concat('Provided pipeline name (',pipeline().parameters.PipelineName,') still has a run in progress or queued given the query range parameters set in the properties table.')}',16,1);
Outcome:
The triggered parent pipeline hits this utility pipeline to check if its already running, if it is an exception is thrown by the utility pipeline – stopping the parent.
Code in Blog Supporting Content GitHub repository. https://github.com/mrpaulandrew/BlogSupportingContent
Hopefully someone else will find this helpful.
Many thanks for reading.
Hey paul can you let how you fixed the API issue
Annoyingly the Data Factory’s own MSI does not have access to query its own Pipeline Runs via the Management API. You have to explicitly grant it access to itself! Madness.
LikeLike
I didn’t fix it. I just gave its MSI owner access to itself.
LikeLike
How did you do to do that? I think I’ve done it, but not sure as I’m getting “Authentication failed. The ‘Authorization’ header is missing”
Did you have to provide any auth headers for the web call?
LikeLike
In advanced set the authentication to MSI for the web activity.
LikeLiked by 1 person
Did you do that in the portal? It would be helpful for those who are new to this. Did you have to add auth headers? I think I’ve given the contributor role to the MI but I’m getting “Authentication failed. The ‘Authorization’ header is missing.”
LikeLike
Yea through the portal.
But treat ADF’s own identity like any other.
LikeLiked by 1 person
Hi Paul, in GET PIPELINE RUNS acitivty, I’m getting error {“message”:”The requested resource does not support http method ‘POST’.”}. What is the resource url we have to mention in MSI authentication?
LikeLike
Hi, the method should be GET, the MSI auth URL should be https://management.core.windows.net/. You can see the complete pipeline JSON used for the blog here: https://github.com/mrpaulandrew/BlogSupportingContent/blob/master/Get%20Data%20Factory%20to%20Check%20Itself%20for%20a%20Running%20Pipeline%20via%20the%20Azure%20Management%20API/Pipeline.json
Thanks
Paul
LikeLike
I got the same error. @mrpaulandrew‘s “GET PIPELINE RUNS” section has an URL which, if you checked the JSON source, actually ends with “….pipeline().DataFactory}/queryPipelineRuns?api-version=2018-06-01”. He must’ve copy pasted the bit from “CHECK FOR VALID PIPELINE NAME” by mistake 😀
LikeLike
Hi Paul,
I was wondering why you would want to this. Would it not be easier to put the concurrency option to 1 in the pipeline settings? Or do I miss something?
LikeLike
Hey, yes, that would also work. I did this post as a precursor blog/test for my processing framework (procfwk.com) where I need a combination of pipeline name and parameter that have been past to it to establish concurrency. Thanks
LikeLike
Correct me if i’m wrong but the concurrency behaviour doesn’t work if the same pipeline is run from a different trigger. Say I have a large ETL run and a small ETL run on a high frequency, if they are on separate triggers they run the risk of overlapping. This way is more flexible. Your processing framework looks great Paul.
LikeLike
Thank you, didn’t know!
LikeLike
Hi Paul ,
Is there any means we can return pipeline status to the ADF Rest API ?
I’m trying to invoke ADF using REST API , it return Runid , but i want to know the status of pipeline , whether its success or failure.
Is it possible to return status of Pipeline from ADF Call?
LikeLike
Not that I tried.
I just use a layer of Azure Functions as the middle ware to execute pipelines, get status and return error messages.
Check out the functions app in my procfwk.com
LikeLike
Pipeline status is absolutely available to return. You can find it in the data factory api documentation
LikeLike
Hi Paul , I am trying this approach in Azure Synapse analytics pipelines . I tried providing multiple roles to my synapse workspace but none of it is working in case of web activity. Every time its giving me the error: The client does not have authorization to perform action.
Can you please specify which particular role you provided to the ADF and can we implement the same in synapse or not.
LikeLike
Hi Paul
I am trying to implement this in Azure Synapse analytics . I am getting an error :
Invoking Web Activity failed with HttpStatusCode – ‘NotFound’, message – ‘The requested resource does not exist on the server. Please verify the request server and retry
It seems we can’t get details of pipeline runs using synapse. Please let me know a way if you know it’s possible
LikeLike
Hi,
I have the same error ‘HttpStatusCode – ‘NotFound’, message – ‘The requested resource does not exist on the server. Please verify the request server and retry’ by using ADF.
Maybe something has changed and now it’s not possible these logs.
LikeLike
I’ve found the solution to the below issue :
‘HttpStatusCode – ‘NotFound’, message – ‘The requested resource does not exist on the server. Please verify the request server and retry’
Pipeline should be triggered instead of Debug.
LikeLike
Hi,
Thank you for this article. I would like to query all historical pipeline runs and have an issue that API call return only 100 pipeline runs even if there are more. Did you have this issue and know how to change this limitation?
LikeLike
Hi,
i would like to check whether this pipeline has already run once with exactly the same call parameters.
How can i implement such a check?
Does anyone have an example?
LikeLike
Hi,
I am getting this error “Invoking Web Activity failed with HttpStatusCode – ‘405 : MethodNotAllowed’, message – ” ”
Any idean?
LikeLike
Hi Paul, I am working on Synapse, what would be equivalent URL for synapse? IN the following it is using dataFactory. Not sure if this works for synapse? please advice.
https://management.azure.com/
subscriptions/@{variables(‘SubscriptionId’)}/
resourceGroups/@{activity(‘Get Resource Group’).output.firstRow.PropertyValue}/
providers/Microsoft.DataFactory/
factories/@{pipeline().DataFactory}/
pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01
LikeLike