Microbatching for Continuously Running Pipelines: Ensuring Smooth and Reliable Operation

Welcome to the final article in this series. Throughout this series of articles, the focus has been to  demonstrate the art of the possible, using Matillion products and features to build the MatiHelper Slack AI app. In this series, we’re taking a deep dive into how the MatiHelper Slack AI app was built, starting with design and now focusing on pipeline development. If you haven’t already seen MatiHelper in action, here’s that video to help set the stage!

In the first part of this series, I discussed how to approach designing data pipelines by highlighting design concepts that can be used to build simple and scalable pipelines. Part 2 focused on the topic of the data lifecycle and how it defines the journey of the data. I also showed what the data lifecycle looks like for the MatiHelper Slack AI app. And, designed a tracking table and ancillary views that show the MatiHelper data at different points throughout its lifecycle. In Part 3, I showed how to use Matillion Flex Connectors as a way of integrating with Slack to fetch messages from a channel. In this Part 4, I focused on the brains of MatiHelper through its integration with Generative AI. In this next part, I’ll focus on integrating Data Productivity Cloud with external systems, using the action of replying to a Slack message as an example. Part 5 touched on some advanced concepts; using webhooks to integrate with external platforms, using Python to easily construct JSON objects and nesting pipeline executions.

Planning Pipeline Executions

In the previous articles in this series, I’ve walked through in detail the Data Productivity Cloud pipelines that usher MatiHelper data through its lifecycle.

All of the pipelines that support this flow are orchestrated via a main pipeline, Slack AI - Main. Each individual execution of this main pipeline will fetch all newly available messages from a Slack channel, send those messages to OpenAI as prompts and reply back to the originating Slack message(s) with the OpenAI generated response. 

At this point, I have a complete end to end pipeline that does all of the bits required for this particular dataset. The next thing to think about is how the pipeline should be executed when deploying it as a production pipeline.

Very frequently, data pipelines need to run on a set schedule, similar to how a passenger train operates on a schedule. In the Data Productivity Cloud, scheduling pipelines can be done simply through the UI. Another common scenario is to have the Data Productivity Cloud Pipeline execute as part of a larger workflow that spans multiple systems or platforms. To help support these and more advanced DataOps scenarios, there are various Data Productivity Cloud API endpoints that can be used. The available API endpoints include the ability to remotely execute a pipeline.

Sometimes, the nature of the data lifecycle requires the continuous ingestion of data from a source. Within the Data Productivity Cloud platform, there is support for Streaming data sources, which supports near real-time data ingestion. Today, Streaming pipelines include support for frequently used relational database sources, with additional sources being introduced over time.

Continuously Running Pipelines

In the case of the MatiHelper Slack AI app, so as to consume any new messages posted to Slack shortly after they are created, the main pipeline is designed to run continuously. As the start of the data journey is Slack, where data can only be ingested via Slack’s available API endpoints, a “microbatch” pattern approach has been designed into the main pipeline’s orchestration logic.

Microbatch Replication

The premise behind a “microbatch” pattern is to design a data pipeline to execute itself as a final step. To make this work, the data pipeline must be designed with idempotence in mind. Each individual execution of the data pipeline must be able to process data at any point in its lifecycle, including identifying new data that should be ingested. Designing pipelines in this way makes the architecture more resilient to failure and lends towards the ability to have the pipeline run continuously. The tracking table that is at the core of the MatiHelper pipelines provides insight into the most current message captured from Slack and also tracks those messages through every step of the MatiHelper data journey. This metadata is key to allowing the MatiHelper pipeline to run continuously.

Having a pipeline restart itself upon successful completion creates an endless loop. As such, when implementing a microbatch pattern, mechanisms are required to ensure that executions can be stopped upon operator request. These mechanisms are required to support things like planned system maintenance windows or  general scheduling requirements (perhaps the continuous pipeline only needs to run during business hours).

When using Data Productivity Cloud, data pipeline executions translate into task hours and those task hours translate into Matillion credit consumption. Understanding the consumption based nature of pipeline execution, one does need to be mindful of how this translates into credit usage. 

Adding Microbatch Logic to a Pipeline

In the MatiHelper Slack AI app, the mechanics of the microbatch pattern are represented by the 4 components at the end of the main pipeline. 

Project Variable

So as to allow for the MatiHelper pipeline to exit out of the continuously running mode, I’ve added a Project Variable, proj_microbatch, which has a default value of 0.

The entry point into the microbatch logic is an If component that evaluates the value of proj_microbatch. The type of variable being used here is important and a project variable was chosen purposefully. To learn more about different variable types in Data Productivity Cloud, see our documentation on Variables!

When using a pipeline variable, the scope of the variable is within that specific pipeline’s execution. During the execution of a pipeline, the value of a pipeline variable is either a default value or a value derived during the execution of the pipeline. When using a project variable, the scope of the variable is across the entire project and default values can additionally be defined at the Environment level. An important nuance of project variables is that project variable default values are not stored as part of the pipeline code itself. And, if the default value of a project variable is updated, the next execution of any pipelines that reference that project variable will see the updated default value. This nuance can be the mechanism to enable/disable continuously running mode of a pipeline. To enable continuous running mode of the MatiHelper pipelines, simply set the default value of proj_microbatch to a value other than 0. And to turn off continuous running mode, or exit out of this mode when running, set the value of proj_microbatch back to 0.

Data Productivity Cloud Flex Connector

In Part 3 of this series, I touched upon using the Slack Flex Connector as a way of easily integrating with Slack. Similarly, there is a Data Productivity Cloud Flex Connector that includes an endpoint to execute a pipeline. This Flex Connector makes it quite simple to use the Data Productivity Cloud API. So, to design the logic for the MatiHelper pipeline to continuously call itself can be boiled down to two components; an If component to determine if microbatch mode is enabled and a Data Productivity Cloud Flex Connector configured to execute the pipeline again.

Before a pipeline can be executed via API, it must first be published to an environment. The documentation on Artifacts details how to publish pipelines from the Designer UI. But, the key prerequisite is that the pipeline has been published to an environment.

When configuring the authentication to be used with the Data Productivity Cloud Flex Connector, you must have first provisioned a Client ID and Client Secret for your Matillion hub account. To learn more about this step, see the documentation here. Once armed with the Matillion Hub Account API Client ID and Client Secret, an OAuth connection  must then be created in the Project. 

In addition to the required authentication, when using the Data Productivity Cloud API endpoint to execute a published pipeline, a request body is required. This request body defines what pipeline to execute and allows to set the value of any runtime variables. The request body as configured in the MatiHelper pipeline is specified as follows:

{
  "pipelineName": "SlackAI App/Slack AI - Main",
  "environmentName": "${pipe_environment}",
  "scalarVariables": {
    "pipe_channel_id": "${pipe_channel_id}",
    "pipe_environment": "${pipe_environment}",
    "pipe_notify_webhook_url": "${pipe_notify_webhook_url}",
    "pipe_webhook_url": "${pipe_webhook_url}",
    "pipe_dpc_server": "${pipe_dpc_server}",
    "pipe_dpc_project_id": "${pipe_dpc_project_id}",
    "pipe_dpc_api_version": "${pipe_dpc_api_version}"
  }
}

The pipelineName attribute maps to the name of the MatiHelper pipeline (Slack AI - Main). Note that if the pipeline is located within a folder, the folder name is also required as part of the pipelineName value. In this case, the pipeline is located in a folder named SlackAI App.

The environmentName attribute must also be set to the name of the environment that the pipeline has been published against.

Not defined in the above request body is an additional versionName attribute. When set, this ensures that a specifically named version (created when publishing pipelines) of the pipeline is executed. When this is not set, the most recent artifact is used for the pipeline execution.

Note also the scalarVariables section of the request body. This section allows for variable values to be defined at runtime. If not explicitly defined at runtime, any referenced pipeline or project variables will use their default value when the pipeline executes. The very first execution of the MatiHelper pipeline will define the value of the referenced pipeline variables. And, as MatiHelper continues to call itself, it will continue to pass the initial variables values to the next execution. What this allows for is to have multiple concurrent executions of the MatiHelper pipeline running, with each monitoring different Slack channels by using different pipeline variable values for each concurrent execution.

Monitoring Continuously Running Pipelines

As highlighted earlier, the mechanism to exit MatiHelper out of its continuously running pattern is to update the default value of the proj_microbatch project variable to 0. The next execution of the pipeline will recognize the updated default value and exit out of its loop. Each time a pipeline runs, it uses up task hours, which costs credits. Although each individual execution of the MatiHelper pipeline runs quickly and uses very few credits, if it is running continuously, the task hours and credit use can add up. To help manage the overall credit consumption, I might have MatiHelper running only during business hours. Or, if speed of response is not critical, I could keep microbatch mode turned off and simply have the MatiHelper pipeline run on a set schedule. 

It is also useful to have MatiHelper monitor itself. For example, it can send a notification if it is running in microbatch mode at the top of the hour. That gives visibility into whether MatiHelper is running when you might not expect it to, and  gives another method of monitoring credit use.

In addition to being prudent, I thought this is also a good opportunity to show another high code feature of Data Productivity Cloud, the use of Javascript expressions. In the MatiHelper pipeline, the entry point into the microbatch logic is the initial If component, which evaluates the value of the proj_microbatch variable. After that step, two things happen simultaneously; the Data Productivity Cloud Flex Connector is executed to trigger MatiHelper to run again and a second If component is executed. This second If component determines if the current time is the top of the hour. This is done by configuring the If component using Advanced mode., which allows for Javascript expressions to define more complex boolean logic. The Update Scalar component also supports Javascript expression syntax for defining Variable values. In this case, to check if the current time is the top of the hour, the Javascript expression syntax is:

new Date().getMinutes() == 00

The actual notification sent from MatiHelper in this scenario is another Slack message using the same webhook method detailed in the prior article in this series. However, the webhook URL used for the notification is defined using a separate variable, allowing for a different Slack channel to be used for monitoring and notifications than the primary channel that MatiHelper is configured to monitor.

Starting MatiHelper

When testing the MatiHelper pipeline, it’s sensible to keep microbatch mode disabled (set proj_microbatch = 0). Then, the MatiHelper pipeline, Slack AI - Main, can be run manually directly from the Designer UI.

Also, if we turn on microbatch mode (set proj_microbatch = 1) and then run the MatiHelper pipeline, it will continue to run until turned off (set proj_microbatch = 0). This could be a way of manually starting MatiHelper.

A more systematic way of starting the MatiHelper pipeline would be to use the same API endpoint that the Data Productivity Cloud Flex Connector utilizes. See our guide on Executing and managing a pipeline for some additional details and examples of how that can work.

Conclusion

So, here concludes the final article in this series. In this article, I focused on how to implement a continuously running pipeline pattern and other tips and tricks around the management and execution of pipelines. 

The intent of this series has been to  showcase the “art of the possible” when using Matillion to help power your data pipelines. Each article in this series can be applicable on their own for numerous data use cases. And, seeing how they all relate to each other as part of the larger data lifecycle should hopefully get you thinking about what your own data journey might look like.

Explore the rest of the series:

Downloads


You can find the MatiHelper Slack AI App pipelines available for download on the Matillion Exchange here!

Arawan Gajajiva
Arawan Gajajiva

Principal Architect - Sales Engineering COE

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.