ADF.procfwk v1.4 – Enhancements for Long Running Pipelines

Code Project Overview

This open source code project delivers a simple metadata driven processing framework for Azure Data Factory (ADF). The framework is delivered by coupling ADF with an Azure SQL Database that houses execution stage and pipeline information that is later called using an Azure Functions App. The parent/child metadata structure firstly allows stages of dependencies to be executed in sequence. Then secondly, all pipelines within a stage to be executed in parallel offering scaled out control flows where no inter-dependencies exist for a given stage.

The framework is designed to integrate with any existing Data Factory solution by making the lowest level executor a stand alone processing pipeline that is wrapped in a higher level of controlled (sequential) dependencies. This level of abstraction means operationally nothing about the monitoring of orchestration processes is hidden in multiple levels of dynamic activity calls. Instead, everything from the processing pipeline doing the work can be inspected using out-of-the-box ADF features.

This framework can also be used in any Azure Tenant and allow the creation of complex control flows across multiple Data Factory resources by connecting Service Principal details to targeted Subscriptions > Resource Groups > Data Factory’s and Pipelines, this offers very granular administration over data processing components in a given environment.

Thank you for visiting, details of this latest release can be found below.

ADF.procfwk content and resources:

Blogs mrpaulandrew.com/ADF.procfwk
GitHub github.com/mrpaulandrew/ADF.procfwk
Twitter #ADFprocfwk

Version 1.4 of ADF.procfwk is ready!

Before diving into the release details a very quick terminology update, please. So far, I’ve referred to the ‘processing pipelines’ as the things created outside of the framework that are called by the Child pipeline in a given processing stage. Going forward it feels better to call these ‘processing pipelines’ the ‘workers’. I’m mindful of over using the word ‘pipeline’ and ‘processing’, which are both easily done given the context! To further support this terminology I’ve added a glossary to the solution as a Markdown page, here. Thanks. Now, onto the good stuff 🙂

Release Overview

For this release of the framework the problem of resiliently handling long running Worker pipelines has been addressed. Previously, only a single Azure Function was used to execute a given worker pipeline which internally entered a while loop (shown below) checking for the pipeline to complete before returning. Although this worked, it wasn’t good practice and didn’t follow common patterns for durable functions. Furthermore, in some cases the Function would result in a failure being reported by the Data Factory activity simply because the HTTP request to the Function didn’t return an accepted status (202) or similar. This meant the client (ADF in this case) would timeout waiting for a response to the request despite the Function still running.

To overcome these issues, a new ‘Async HTTP API’ style pattern has been baked into this latest version of the processing framework which now uses two Azure Functions. Firstly, calling the pipeline and returning a response to Data Factory, then the second Function polls the Workers status to establish when it completes before allowing the Child to continue. An overview of the approach can be visualised as follows. More details in the Data Factory Changes section of this blog below.

The main reason for wrapping this capability in a dedicated ‘Infant’ pipeline is because the ‘Child’ is already running as an iterator itself within an ADF ForEach activity, which allows the workers to be scaled out within a processing stage. Given this, Data Factory does not currently allow a second level iterator, in this case an Until activity, to be nested within an existing ForEach activity. Hence the need to put the Until activity in its own pipeline.

At runtime we can therefore state the following for these new Infant level pipelines within the processing framework.

  1. The Grandparent bootstrap optionally sets up the environment for processing and then calls the Parent.
  2. The Parent sequentially calls our metadata stages to be executed and passes off each stage to the Child.
  3. The Child executes all Worker pipelines for the given stage in parallel and then passes off the status checking to the Infant.
  4. The Infant checks the given Worker pipeline using its Run ID for a completion status and waits in each iteration until the Worker finishes.

On the right is a crude processing chain for each pipeline type. Learning the colours is a test, see the repository images folder or the below previous blog on the different generations of pipelines 🙂

Azure Data Factory – Pipeline Hierarchies (Generation Control)

Function Changes

The Azure Function App now contains the following three Functions:

  1. ExecutePipeline – This is the original Function used by the framework which I’ve left in place for now. It will be removed in a later version of the framework.
  2. ExecutePipelineV2 – This updated executor Function is now used to start the worker pipelines. This will only wait until the worker pipeline is ‘InProgress’ and no longer ‘Queued’ before returning. This Function accepts exactly the same Body request as the previous version so switching the activity call within Data Factory is very simple, just add V2.
  3. CheckPipelineStatus – This Function has been taken from my previous blog post (here) and is used to get the status of the Worker pipeline for a given Run ID. The RunID is provided by the ExecutePipelineV2 Function as part of its returned output. If you wanted to test the check status function via Postman it expects the following Body request:

{
"tenantId": "1234-1234-1234-1234-1234",
"applicationId": "1234-1234-1234-1234-1234",
"authenticationKey": "Passw0rd123!",
"subscriptionId": "1234-1234-1234-1234-1234",
"resourceGroup": "ADF.procfwk",
"factoryName": "FrameworkFactory",
"pipelineName": "Wait1",
"runId": "1234-1234-1234-1234-1234"
}

For both new Functions the DataFactoryManagementClient has been refactored into an internal class to minimise on code duplication.

Finally, given the new Function App requirements where each Function call should be very quick to return a result a dedicated App Service Plan is no longer required. The Function App can use a much cheaper (probably free, depending on execution levels) Consumption Plan. Free vs around £65 per month for the smallest app service plan is hopefully another good reason to upgrade to this latest release of the ADF.procfwk.

Data Factory Changes

The pipeline activities have now evolved into the following for version 1.4 of the processing framework:

Click to enlarge or check out the PowerPoint file in the repository images folder.

The key change in ADF is the introduction of the Infant pipeline and how this is called for each worker pipeline executed (in parallel) by the Child.

Firstly, be aware that the Switch activity used to report the worker pipeline outcome status has now been moved into the Infant pipeline. Its case conditions remain the same as the previous release.

Next, the Infant contains an Until activity which iterates until the worker pipeline completes. The exact logic for the looping uses the new ‘SimpleStatus’ value added to the second Azure Function (CheckPipelineStatus). The SimpleStatus in the context of the Until condition means:

  • Running = perform another iteration. Or, logic test = false.
  • Done = break from the Until activity. Or, logic test = true.

Finally, to avoid time wasting, for each Until iteration an If activity is used to house the eventual Wait activity. This uses the same logic as the Until bullet points above, except a simple status of Running the Wait activity being used. Otherwise, no Wait will be called and it is expected that the Until can break.

In addition to the above, on each iteration of the Infant Until activity the database table[procfwk].[CurrentExecution]is updated with a new attribute[LastStatusCheckDateTime]just to provide some operational visibility to the looping behaviour for each running worker pipeline.

For the Wait activity, the duration has been added as a property within the metadata database and retrieved using a Lookup activity only once per Infant pipeline call. The default value is 60 seconds between worker pipeline status checks. This is set using the SSDT project post deployment script file, as per the other framework properties.

Other Changes

There are several other small tweaks in this release to be aware of:

  • Any stored procedures in the database that were using the T-SQL function GETDATE() have been updated to now use GETUTCDATE(). The main reason for this is to align the data reported in the log for start and end times with those seen via the Data Factory monitoring portal, which operates on UTC.
  • A Glossary has been added to the root of the code repository to offer some clarification on terms used when working with the processing framework.
  • The database diagram found in the repository images folder has been updated with all the latest table changes and the v1.1 file removed.
  • A run duration calculation attribute has been added to the database view [procfwk].[LastExecution].
  • Several new database views have been added to query and aggregate different aspects of the metadata. These are:
    • [procfwk].[CurrentExecutionSummary]
    • [procfwk].[AverageStageDuration]
    • [procfwk].[CompleteExecutionLog]

That concludes the release notes for this version of ADF.procfwk.

Please reach out if you have any questions or want help updating your implementation from the previous release.

Many thanks

2 thoughts on “ADF.procfwk v1.4 – Enhancements for Long Running Pipelines

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.