Get Any Azure Data Factory Pipeline Run Status with Azure Functions

Microsoft.Azure.Management.DataFactory.Models.RunFilterParameters

So I recently found myself with code withdrawal… Yes, its a real thing!  Anyone in an architect type role will understand, I hope… You can’t spend all your time drawing pictures of how things “should” work without occasionally getting hands-on with some code. Anyway, my recent episode of code withdrawal led me to creating some simple, but very useful Azure Functions that I’ve wanted to work on for a while.

The following 3 Azure Functions allow me/you to return the status of any Azure Data Factory pipeline once you supply a few of the usual parameters. I created the set to give me options in behaviour depending on requirements. The following visual offers an overview, because I still had to draw a picture as well! In each case, a user or service can hit the functions via a URL and return the status of an Azure Data Factory pipeline using the pipeline name.

Filtering Pipeline Runs

Before going into the detail of the functions I firstly want to call out how I filtered the pipeline runs for a given Data Factory to ensure only the status of the provided pipeline name is returned. This involves two main filtering parts within the .Net client abstraction provided by Microsoft.

  1. Passing in a data range to the pipeline runs query (client.PipelineRuns.QueryByFactory).
  2. Passing in the Data Factory pipeline name, as provided by the function call.

Both filter parts are wrapped in 3 levels of lists:

  • Firstly, a list of pipeline names,
  • Next, in the RunQueryFilter, where the Pipeline Name is set as the filter criteria.
  • Finally, in the RunFilterParameters, where the date range and pipeline all come together.

For the date range I’ve added an App Setting to the Azure Function App. This setting applies to the first two functions below where we need to handle the period of days we want to use when querying any Data Factory for its pipeline runs. As you probably know whenever a pipeline is triggered within Data Factory it is given a Run ID. This GUID value is critical when returning the status of a pipeline as its important to have the context of when the execution occurred. To simplify the functions I’ve resolved what the Run ID will be by returning all pipeline runs in a given time period and then taking the last value found for the named pipeline. Yes, this is a little crude, which is why I went on the create the third function which can handle the passing of a custom Run ID as well. But still, we can’t get the status of a pipeline without the curial context (Run ID).

The default value I’ve used for the time period filtering is 7 days. This used todays date as the start and todays date minus 7 for the end date. This can of course be changed if you have a larger or smaller window of time for your pipeline runs. But please consider the more executions you have the longer this will take for the function to return.

App setting name: DefaultDaysForPipelineRuns

For the pipeline name filtering this uses the inner part of the RunQueryFilter and passes the Operand, Operator Property and Values to ensure only pipeline runs are returned for our provided pipeline. The values part of the filtering does accept a list of pipeline names, but for my functions I’m expecting only a single value. For more details on these methods please check out the following Microsoft docs page.

https://docs.microsoft.com/en-us/rest/api/datafactory/pipelineruns/querybyfactory#runqueryfilteroperand

In terms of the code, I created the RunFilterParameters as follows and using the constants RunQueryFilterOperand and RunQueryFilterOperator.

Next a quick overview of the functions themselves:


Get Pipeline Status by Name

Hopefully as the name suggests, this will return the status of any pipeline for a given Data Factory using the name of the pipeline. And as per the filtering details above this assumes that the pipeline has been executed in the last 7 days. Then takes the latest Run ID found.

Body JSON:

{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "CommunityDemos",
"factoryName": "PaulsFunFactoryV2",
"pipelineName": "WaitingPipeline"
}

Output

{
"PipelineName": "WaitingPipeline",
"RunIdUsed": "0d069026-bcbc-4356-8fe8-316ce5e07134",
"Status": "Succeeded"
}


Get and Wait For a Pipeline Status By Name

This function will return the status of any pipeline for a given Data Factory using the name of the pipeline. However, if the pipeline is still running or queued the function will block and wait for it to complete. On completion it will return the success status similar to the other functions, or if errored, the error message from the pipeline will be returned.

Body JSON:

{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "CommunityDemos",
"factoryName": "PaulsFunFactoryV2",
"pipelineName": "WaitingPipeline"
}

Output

{
"PipelineName": "WaitingPipeline",
"RunIdUsed": "0d069026-bcbc-4356-8fe8-316ce5e07134",
"Status": "Succeeded"
}


Get Pipeline Status by Name and Run ID

This function will return the status of any pipeline for a given Data Factory using the name of the pipeline and also the run ID to establish the context for the execution.

Body JSON:

{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "CommunityDemos",
"factoryName": "PaulsFunFactoryV2",
"pipelineName": "WaitingPipeline",
"runId": "1234-1234-1234-1234-1234" // <<< addition for this function
}

Output

{
"PipelineName": "WaitingPipeline",
"RunIdUsed": "0d069026-bcbc-4356-8fe8-316ce5e07134",
"Status": "Succeeded"
}


Other Thoughts

Just a couple of final thoughts related to the above post.

  • If you haven’t developed Azure Function Apps before you really should. The developer experience with Visual Studio 2017 onwards is really nice as it gives you a local functions emulator and mini web server to hit the functions from your local machine within a console. This is good because you can see the console WriteLine etc. Then add PostMan into the mix and its really easy to get going with the code. I’ve added a screen shot on the right of my experience to give you a quick view on how nice it is. Click to enlarge. This is also a symptom of code withdrawl… getting excited by dev tools! 🙂
  • In production we of course wouldn’t be passing service principal ID’s and keys around in function calls. We would use Azure Key Vault and/or Managed Service Identities. Especially if the function wasn’t being called externally to the Azure tenant and maybe triggered in some other way. Hopefully we can appreciate that I only did this in the body of this function request to make the code standalone for anyone that wanted to use it.

Just Give Me The Code!

Ok! Here you go…. Functions.cs 🙂

The full solution is in the same Blob Support Content GitHub repository if you’d like to use the Visual Studio Solution.

I hope you found this post helpful.

Many thanks for reading.

26 thoughts on “Get Any Azure Data Factory Pipeline Run Status with Azure Functions

  1. Hi Paul,
    Suppose the Azure function is ready. We need to call it after every activity in Pipeline ? I need this function executed as and when there is an error in Activity. How to implement this?

    Like

    1. In my processing framework I first check the status of a given pipeline. Then use a Switch activity. If failed, I get the error.
      See adfprocfwk.com

      Like

    1. Not that I’m aware of, you’d need to create it yourself by first listing all available pipelines. Then getting the status for each.

      Like

  2. Hi
    I want to run pipeline runs for each file synchrously in logic apps.
    Using get pipeline run status, im not getting correct status of pipeline run.
    How can I get pipeline status from logic apps.
    Thanks
    Kalyani

    Like

  3. Hi Paul. I tried implementing this with Python but i get internal server error when I use this import “azure.synapse.artifacts.operations.PipelineRunOperations”. Can this be achievable with Python?

    Like

    1. Hi, yes definitely. However, I think the Synapse libraries your using here are still in beta. Maybe try with the Data Factory classes first as I’ve done this. Cheers

      Like

      1. Hey Paul. Thank you for the quick response. I did try implementing with ADF using “import azure.mgmt.datafactory.models”. I am still getting the Internal Server Error by using this import in Azure Function. Are you sure that we can deploy this azure function using python to get the Pipeline Status by giving RunID as input? My language requirement is Python, so I just want to make sure. If not, I can use your code as reference and implement using CSharp.

        Like

      2. Yes, maybe a silly question. But is you App Service setup with a Linux host. Last time I checked the Windows host didn’t support Python functions.
        Maybe also check the SPN details your passing and permissions to the target ADF instance.

        Like

  4. Hi Paul,

    first time I’m programmatically accessing ADF. What is the appliationID? Is it the Managed Identity Application ID of the ADF factory? And where can I find the authenticationkey?

    Regards,
    Koen

    Like

      1. OK, I created an app registration. First it had access to the Azure Management API only, but that didn’t seem to work. I then added the app as contributor to ADF and now I can retrieve the statuses of my pipelines in the Azure Function. Thanks.

        Like

    1. This is the application ID of the Azure AD service principal that you use to authenticate against the target Data Factory.

      Like

  5. .Net MVC, I’m not able to get token when my application is deployed on web.
    also not able to generate token when i’m sign off from my VS 2019.
    if you know this issue..

    Like

  6. Hi All, I need to get all pipelines from ADF in c#. I have more around 92 pipelines. But client.Pipelines.ListByFactory only returning 50 pipelines only. Can you please provide any suggestions?

    Like

  7. I have a requirement to trigger an Azure function as soon as the file is copied into the blob storage. Based on the blob storage location / structure, a unique python script to be attached to the ADF. The ADF should read the file from the specific bucket/folder and process it based on the attached python script and store the processed data into the Cosmos DB.
    There will be multiple input files copied into the blob storage simultaneously by the multiple customers.
    Will this solution scalable? Attaching the python script with ADF dynamically by the Azure function and run the ADF pipelines concurrently to process multiple input files?

    Like

  8. RunFilterParameters(
    last_updated_after=datetime.now(timezone.utc) – timedelta(7), last_updated_before=datetime.now(timezone.utc))
    I am using this to get last 7 days runs , but it is giving only the one that is run on the day 7 day before . Can you please help in getting all the run id from that day till today?

    Like

  9. Hi A great blog indeed! Also wanted to know how we can use managed identity to access ADF Rest API from web app… please add if any link for the same

    Like

  10. Hi, There are already 200+ pipelines already in production. We cannot afford to change any existing pipeline. So without changing the existing pipeline, can I use Azure Functions to retrieve the appropriate error even from activities within nested pipelines?

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.