Microsoft.Azure.Management.DataFactory.Models.RunFilterParameters
So I recently found myself with code withdrawal… Yes, its a real thing! Anyone in an architect type role will understand, I hope… You can’t spend all your time drawing pictures of how things “should” work without occasionally getting hands-on with some code. Anyway, my recent episode of code withdrawal led me to creating some simple, but very useful Azure Functions that I’ve wanted to work on for a while.
The following 3 Azure Functions allow me/you to return the status of any Azure Data Factory pipeline once you supply a few of the usual parameters. I created the set to give me options in behaviour depending on requirements. The following visual offers an overview, because I still had to draw a picture as well! In each case, a user or service can hit the functions via a URL and return the status of an Azure Data Factory pipeline using the pipeline name.
Filtering Pipeline Runs
Before going into the detail of the functions I firstly want to call out how I filtered the pipeline runs for a given Data Factory to ensure only the status of the provided pipeline name is returned. This involves two main filtering parts within the .Net client abstraction provided by Microsoft.
- Passing in a data range to the pipeline runs query (client.PipelineRuns.QueryByFactory).
- Passing in the Data Factory pipeline name, as provided by the function call.
Both filter parts are wrapped in 3 levels of lists:
- Firstly, a list of pipeline names,
- Next, in the RunQueryFilter, where the Pipeline Name is set as the filter criteria.
- Finally, in the RunFilterParameters, where the date range and pipeline all come together.
For the date range I’ve added an App Setting to the Azure Function App. This setting applies to the first two functions below where we need to handle the period of days we want to use when querying any Data Factory for its pipeline runs. As you probably know whenever a pipeline is triggered within Data Factory it is given a Run ID. This GUID value is critical when returning the status of a pipeline as its important to have the context of when the execution occurred. To simplify the functions I’ve resolved what the Run ID will be by returning all pipeline runs in a given time period and then taking the last value found for the named pipeline. Yes, this is a little crude, which is why I went on the create the third function which can handle the passing of a custom Run ID as well. But still, we can’t get the status of a pipeline without the curial context (Run ID).
The default value I’ve used for the time period filtering is 7 days. This used todays date as the start and todays date minus 7 for the end date. This can of course be changed if you have a larger or smaller window of time for your pipeline runs. But please consider the more executions you have the longer this will take for the function to return.
App setting name: DefaultDaysForPipelineRuns
For the pipeline name filtering this uses the inner part of the RunQueryFilter and passes the Operand, Operator Property and Values to ensure only pipeline runs are returned for our provided pipeline. The values part of the filtering does accept a list of pipeline names, but for my functions I’m expecting only a single value. For more details on these methods please check out the following Microsoft docs page.
In terms of the code, I created the RunFilterParameters as follows and using the constants RunQueryFilterOperand and RunQueryFilterOperator.
Next a quick overview of the functions themselves:
Get Pipeline Status by Name
Hopefully as the name suggests, this will return the status of any pipeline for a given Data Factory using the name of the pipeline. And as per the filtering details above this assumes that the pipeline has been executed in the last 7 days. Then takes the latest Run ID found.
Body JSON:
{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "CommunityDemos",
"factoryName": "PaulsFunFactoryV2",
"pipelineName": "WaitingPipeline"
}
Output
{
"PipelineName": "WaitingPipeline",
"RunIdUsed": "0d069026-bcbc-4356-8fe8-316ce5e07134",
"Status": "Succeeded"
}
Get and Wait For a Pipeline Status By Name
This function will return the status of any pipeline for a given Data Factory using the name of the pipeline. However, if the pipeline is still running or queued the function will block and wait for it to complete. On completion it will return the success status similar to the other functions, or if errored, the error message from the pipeline will be returned.
Body JSON:
{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "CommunityDemos",
"factoryName": "PaulsFunFactoryV2",
"pipelineName": "WaitingPipeline"
}
Output
{
"PipelineName": "WaitingPipeline",
"RunIdUsed": "0d069026-bcbc-4356-8fe8-316ce5e07134",
"Status": "Succeeded"
}
Get Pipeline Status by Name and Run ID
This function will return the status of any pipeline for a given Data Factory using the name of the pipeline and also the run ID to establish the context for the execution.
Body JSON:
{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "CommunityDemos",
"factoryName": "PaulsFunFactoryV2",
"pipelineName": "WaitingPipeline",
"runId": "1234-1234-1234-1234-1234" // <<< addition for this function
}
Output
{
"PipelineName": "WaitingPipeline",
"RunIdUsed": "0d069026-bcbc-4356-8fe8-316ce5e07134",
"Status": "Succeeded"
}
Other Thoughts
Just a couple of final thoughts related to the above post.
If you haven’t developed Azure Function Apps before you really should. The developer experience with Visual Studio 2017 onwards is really nice as it gives you a local functions emulator and mini web server to hit the functions from your local machine within a console. This is good because you can see the console WriteLine etc. Then add PostMan into the mix and its really easy to get going with the code. I’ve added a screen shot on the right of my experience to give you a quick view on how nice it is. Click to enlarge. This is also a symptom of code withdrawl… getting excited by dev tools! 🙂
In production we of course wouldn’t be passing service principal ID’s and keys around in function calls. We would use Azure Key Vault and/or Managed Service Identities. Especially if the function wasn’t being called externally to the Azure tenant and maybe triggered in some other way. Hopefully we can appreciate that I only did this in the body of this function request to make the code standalone for anyone that wanted to use it.
Just Give Me The Code!
Ok! Here you go…. Functions.cs 🙂
The full solution is in the same Blob Support Content GitHub repository if you’d like to use the Visual Studio Solution.
I hope you found this post helpful.
Many thanks for reading.
Hi Paul,
Suppose the Azure function is ready. We need to call it after every activity in Pipeline ? I need this function executed as and when there is an error in Activity. How to implement this?
LikeLike
In my processing framework I first check the status of a given pipeline. Then use a Switch activity. If failed, I get the error.
See adfprocfwk.com
LikeLike
Hey Paul, just a quick question, is there a list of all status from PipelineRuns objtect? I’ve trying to look for those on microsoft documentation but theres only runstate and runresult (https://docs.microsoft.com/en-us/rest/api/azure/devops/pipelines/runs/run%20pipeline?view=azure-devops-rest-6.0#runresult)
thank you!
LikeLike
Not that I’m aware of, you’d need to create it yourself by first listing all available pipelines. Then getting the status for each.
LikeLike
Hi
I want to run pipeline runs for each file synchrously in logic apps.
Using get pipeline run status, im not getting correct status of pipeline run.
How can I get pipeline status from logic apps.
Thanks
Kalyani
LikeLike
Why are you using a logic app?
LikeLike
Hi Paul. I tried implementing this with Python but i get internal server error when I use this import “azure.synapse.artifacts.operations.PipelineRunOperations”. Can this be achievable with Python?
LikeLike
Hi, yes definitely. However, I think the Synapse libraries your using here are still in beta. Maybe try with the Data Factory classes first as I’ve done this. Cheers
LikeLike
Hey Paul. Thank you for the quick response. I did try implementing with ADF using “import azure.mgmt.datafactory.models”. I am still getting the Internal Server Error by using this import in Azure Function. Are you sure that we can deploy this azure function using python to get the Pipeline Status by giving RunID as input? My language requirement is Python, so I just want to make sure. If not, I can use your code as reference and implement using CSharp.
LikeLike
Yes, maybe a silly question. But is you App Service setup with a Linux host. Last time I checked the Windows host didn’t support Python functions.
Maybe also check the SPN details your passing and permissions to the target ADF instance.
LikeLike
Hi Paul,
first time I’m programmatically accessing ADF. What is the appliationID? Is it the Managed Identity Application ID of the ADF factory? And where can I find the authenticationkey?
Regards,
Koen
LikeLike
Yes it can be. Or the ID of your SPN that has access to execute the target pipeline.
LikeLike
OK, I created an app registration. First it had access to the Azure Management API only, but that didn’t seem to work. I then added the app as contributor to ADF and now I can retrieve the statuses of my pipelines in the Azure Function. Thanks.
LikeLike
Yup, perfect. If you want to do the same for Synapse pipelines check out my procfwk.com 😉
LikeLiked by 1 person
Hi thanks for a this I have one simple question where do I get the setting applicationId for the json above?
LikeLike
This is the application ID of the Azure AD service principal that you use to authenticate against the target Data Factory.
LikeLike
.Net MVC, I’m not able to get token when my application is deployed on web.
also not able to generate token when i’m sign off from my VS 2019.
if you know this issue..
LikeLike
Sorry no.
Do you have any more information or error message details?
LikeLike
Hi All, I need to get all pipelines from ADF in c#. I have more around 92 pipelines. But client.Pipelines.ListByFactory only returning 50 pipelines only. Can you please provide any suggestions?
LikeLike
I have a requirement to trigger an Azure function as soon as the file is copied into the blob storage. Based on the blob storage location / structure, a unique python script to be attached to the ADF. The ADF should read the file from the specific bucket/folder and process it based on the attached python script and store the processed data into the Cosmos DB.
There will be multiple input files copied into the blob storage simultaneously by the multiple customers.
Will this solution scalable? Attaching the python script with ADF dynamically by the Azure function and run the ADF pipelines concurrently to process multiple input files?
LikeLike
RunFilterParameters(
last_updated_after=datetime.now(timezone.utc) – timedelta(7), last_updated_before=datetime.now(timezone.utc))
I am using this to get last 7 days runs , but it is giving only the one that is run on the day 7 day before . Can you please help in getting all the run id from that day till today?
LikeLike
Hi A great blog indeed! Also wanted to know how we can use managed identity to access ADF Rest API from web app… please add if any link for the same
LikeLike