Get Data Factory to Check Itself for a Running Pipeline via the Azure Management API

Hi friends, just a very quick how to guide style post on something I had to build in Azure Data Factory.


Scenario: I want to trigger a Data Factory pipeline, but when I do I want the pipeline to know if it’s already running. If it is already running, stop the new run.

Sounds simple enough right?

Wrong!

But, now simple for you, because I’ve done it for you, yay! 🙂

Solution:

This is intended to become a utilities pipeline that is wrapped up by an Execute Pipeline Activity. Once done it will be provided with the following pipeline parameters:

  • PipelineName
    • (the pipeline that called it, so using the system variable: @pipeline().Pipeline)
  • ThisRunId
    • (from the pipeline that called it, so using the system variable: @pipeline().RunId)

Going through these activities in order, left to right from the image above.

Get Subscription

Assuming nothing I used a Web Activity to firstly hit the Azure Management API to return the subscription ID for the Data Factory I’m working in.

https://management.azure.com/subscriptions?api-version=2020-01-01

Nothing special here in terms of a URL or request and I’m using Data Factory’s own MSI to authenticate.

Set Variable

For debugging ease I used a variable and parsed the output of the first Web Activity result to give me the clean Subscription ID value. Also I was expecting to need to deal with a long list of subscriptions. As it turned out Value[0] was good enough.

@replace(activity('Get Subscription').output.value[0].id,'/subscriptions/','')

Get Resource Group Name

Next, I slightly cheated here by simply storing the Resource Group Name in a SQL database table. I’m sure it can be returned via the management API, but I gave up looking around the MS Docs site and went for the easy option with a Lookup Activity.

Everything I found needed the Resource Group Name provided, you couldn’t reverse engineer it from the Resource, that I could see. Happy to be told otherwise.

Check For Valid Pipeline Name

Armed with the following values:

  • Pipeline Name – from the parameter.
  • Subscription Id – from the variable.
  • Resource Group Name – from the database table lookup.
  • Data Factory name – from its own system variable.

I decided to be a good developer I do an initial defensive check to ensure the pipeline name provided was in fact valid for this Data Factory. This includes needing the pipeline to be deployed, not just running in Debug mode.

https://management.azure.com/subscriptions/@{variables('SubscriptionId')}/resourceGroups/@{activity('Get Resource Group').output.firstRow.PropertyValue}/providers/Microsoft.DataFactory/factories/@{pipeline().DataFactory}/pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01

If not valid, the exception returned isn’t very informative, but its better than assuming its not running at all on the pipeline succeeding.

Get Query Run Days Value

As we are running a query against Data Factory for any pipeline runs by name, some extra filtering will be required. Basically providing a possible start and end date/time value for the pipeline run information we return. This uses the management API RunQueryFilter options than get passed in the request body.

https://docs.microsoft.com/en-us/rest/api/datafactory/pipelineruns/querybyfactory

To give me the flexibility to change this in the future I decided to once again get this value from a SQL database table. In my case returning ‘-10’, to be used below to define the start window date/time.

Get Pipeline Runs

Now the fun part.

I hit the Azure Management API again with a Web Activity. Performing the following POST request to query the pipeline runs:

URL https://management.azure.com/

subscriptions/@{variables('SubscriptionId')}/

resourceGroups/@{activity('Get Resource Group').output.firstRow.PropertyValue}/

providers/Microsoft.DataFactory/

factories/@{pipeline().DataFactory}/

pipelines/@{pipeline().parameters.PipelineName}?api-version=2018-06-01

Body {
"lastUpdatedAfter": "@{adddays(utcnow(),int(activity('Get Query Run Days Value').output.firstRow.PropertyValue))}",
"lastUpdatedBefore": "@{utcnow()}",
"filters": [
{
"operand": "PipelineName",
"operator": "Equals",
"values": [
"@{pipeline().parameters.PipelineName}"
]
}
]
}

To my surprise this request failed due to an authentication issue.

Annoyingly the Data Factory’s own MSI does not have access to query its own Pipeline Runs via the Management API. You have to explicitly grant it access to itself! Madness.

Anyway, a few grumpy tweets later, moving on 🙂

Filter Running Pipelines

Now to filter the results to establish if the target pipeline is running.

Using the Filter Activity is a nice way to handle this, firstly providing the array of results:

@activity('Get Pipeline Runs').output.value

Then using the condition expression, this had to exclude its own Run ID (provided as a pipeline parameter), then consider a status of either In Progress or Queued as ultimatley ‘Running’. Express below:

@and(not(equals(item().runId,pipeline().parameters.ThisRunId)),or(equals(item().status,'InProgress'),equals(item().status,'Queued')))

If Pipeline Is Running

Finally, an If Activity. In my case, if the result of the filtering was still a value greater than or equal to one. Meaning the pipeline is already running.

@greaterOrEquals(int(activity('Filter Running Pipelines').output.FilteredItemsCount),1)

Within the If True outcome I used another Lookup Activity to throw the exception. Data Factory doesn’t currently have a means of throwing its own exceptions so I used my SQL database a third time to handle this with a T-SQL RAISERROR statement, shown below.

RAISERROR('@{concat('Provided pipeline name (',pipeline().parameters.PipelineName,') still has a run in progress or queued given the query range parameters set in the properties table.')}',16,1);

Outcome:

The triggered parent pipeline hits this utility pipeline to check if its already running, if it is an exception is thrown by the utility pipeline – stopping the parent.


Code in Blog Supporting Content GitHub repository. https://github.com/mrpaulandrew/BlogSupportingContent


Hopefully someone else will find this helpful.

Many thanks for reading.

3 thoughts on “Get Data Factory to Check Itself for a Running Pipeline via the Azure Management API

  1. Hey paul can you let how you fixed the API issue
    Annoyingly the Data Factory’s own MSI does not have access to query its own Pipeline Runs via the Management API. You have to explicitly grant it access to itself! Madness.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.