Webhooks and Pushdown Python: Building Interactive and Efficient Data Applications

Welcome to Part 5 of this blog series, demonstrating the art of the possible, using Matillion products and features to build the MatiHelper Slack AI app. In this series, we’re taking a deep dive into how the MatiHelper Slack AI app was built, starting with design and now focusing on pipeline development. If you haven’t already seen MatiHelper in action, here’s that video to help set the stage!

In the first part of this series, I discussed how to approach designing data pipelines by highlighting design concepts that can be used to build simple and scalable pipelines. Part 2 focused on the topic of the data lifecycle and how it defines the journey of the data. I also showed what the data lifecycle looks like for the MatiHelper Slack AI app. And, designed a tracking table and ancillary views that show the MatiHelper data at different points throughout its lifecycle. In Part 3, I showed how to use Matillion Flex Connectors as a way of integrating with Slack to fetch messages from a channel. In Part 4, I focused on the brains of MatiHelper through its integration with Generative AI. 

In this next part, I’ll focus on integrating Data Productivity Cloud with external systems, using the action of replying to a Slack message as an example.

Integrating with External Systems via Webhook

In Part 3 of this series, Matillion’s Custom Connector framework was highlighted as a method to integrate with Slack APIs as a source of data. In this part of the MatiHelper pipeline, another common method of integration is being used; webhooks. For those not familiar with webhooks, you might think of them as a type of API that allows for an automated message to be sent to another system. Webhooks are frequently used for “event driven” patterns of integration between different applications or systems. In Data Productivity Cloud, there is a Webhook Post component, which allows a Data Productivity Cloud pipeline to push a message to an external system’s webhook endpoint.  

There can be various scenarios where webhooks might be interjected into a data pipeline. One of the most common use cases is when there are multiple dependent processes that run in different systems. Very often, webhooks are a common method of allowing an external system to trigger the execution of a process in another system. Data Productivity Cloud allows one to orchestrate those enterprise process dependencies and push to different webhook endpoints at appropriate times. 

When looking at the MatiHelper pipelines, the pipeline that sends the AI response back to Slack is using a webhook to send this message back. Realistically, I could have used Custom Connector (see Part 3 in this series) to post a message back to Slack using the chat.postMessage API endpoint. But, as Slack does also support webhooks as a way of receiving messages, in the spirit of the “art of the possible”. I elected to use the webhook method to demonstrate how that is done.

When integrating with a webhook, most of the complexity is in the initial setup of the webhook in the external system. Fortunately, Slack has some great documentation that walks through setting up a Slack webhook. As an outcome of setting up the webhook, there will be a unique URL generated. This unique URL is needed when configuring the Webhook Post component.

Formatting the Webhook Message

Another area of complexity around webhooks is formatting the “payload”, which is the message to be sent to the external system. The requirements around the payload format will be system specific and documentation is key to understanding those formatting requirements. When using a Slack webhook, the message format is JSON at its core (which is most common), but Slack also supports advanced formatting in the message. Another nuance of the Slack webhook payload is that the MatiHelper pipeline sends the message as a threaded reply, which requires an additional thread_ts attribute (which represents the timestamp of the original message) to be included in the payload. 

Irrespective of the contents of the payload, most webhooks expect a payload in JSON format. When sending a webhook message, if the payload is not formatted correctly, sending the message will fail, typically with an error message indicating a format issue. For MatiHelper, the Slack webhook payload message needs to be of this JSON structure:

{
    "text": "Response captured from GenAI",
    "thread_ts": 1234567890,
    "channel": "Slack Channel ID"
}

So, there are 3 primary attributes in the JSON structure:

  • “text” represents the actual message that will be posted as the reply to the original Slack message.
  • “thread_ts” represents the unix timestamp value that uniquely identifies the original Slack message being replied to.
  • “channel” is the Slack Channel ID that the original message was posted to (this is the same Slack Channel ID that was discussed in Part 3 of this series).

As you might expect, in the MatiHelper pipelines, the values for these attributes are defined via pipeline variables. The rest of the JSON structure is defined as a template in the preceding Python Pushdown component.

Python Pushdown

When developing this part of the MatiHelper pipelines, I had originally defined the JSON template structure directly in the Webhook Post component and swapped in variable syntax for values. But, when testing, I encountered errors indicating that I had formatted the JSON payload incorrectly. Now, there are many ways I could have ultimately addressed my formatting error. In the spirit of the “art of the possible”, I thought this would be a good opportunity to showcase some of Matillion’s high code capabilities. 

Data Productivity Cloud was created as a data platform that caters to different types of users, essentially democratizing data pipeline creation. When building data pipelines using Data Productivity Cloud, the drag and drop user interface and guided component configuration appeals to both “low code” users and “high code” users. A “low code” user may understand their data needs, but without programming language skills, may not know where to begin to build a data pipeline. Data Productivity Cloud makes data pipeline development accessible to these users. 

On the other hand, “high code” users, who are accustomed to building data pipelines from the ground up, still appreciate the easy-to-use user interface of Data Productivity Cloud. By removing the need to create and manage new code for common data engineering tasks, high code users are empowered to put their skills to better use solving more complex problems. 

In the world of data engineering, data analytics and AI, Python and SQL are very common languages. For those scenarios where users want to leverage things like Python or SQL as part of their data pipeline, the Designer interface natively has a high code editor. The Designer code editor assists in the development of things like Python and SQL scripts.. And, there are specific components that allow for the execution of custom SQL or Python code.

Looking at Python in particular, Data Productivity Cloud provides numerous ways, both directly and indirectly, to execute Python code. In the MatiHelper pipelines, I’m using the Python Pushdown component. The “Pushdown” nature of this component is that the code defined in the component is pushed down to the cloud data platform (Snowflake in this case) to execute. The Pushdown nature of the component taps into the near infinite power of a Snowflake warehouse to power the execution of that Python code. Today, we support Snowflake for Python Pushdown, but we will be adding support for other cloud data platforms in the future!

While it’s easy to think of user personas as “low code” or “high code”, in reality, most users sit somewhere in between. And, over time, “low code” users tend to pick up some coding tricks along the way. There are a lot of helpful things one can do with just a little bit of Python syntax knowledge. In this part of the MatiHelper pipelines, we’re looking at one of the scenarios where a little bit of Python syntax can help to simplify things. Specifically, when there is a need for more advanced variable value manipulation, Python can come in handy.

Back to the problem at hand. In this step of the MatiHelper pipeline, the Slack webhook is being used to send a response back to the original message. The webhook payload must be a JSON object with the basic structure shown earlier. Python has a JSON encoder/decoder library that makes it very simple to create (encode) or parse (decode) data in JSON format. Here is the exact Python code that is used to construct the webhook payload:

import json

data = {
   "text": pipe_text,
   "thread_ts": pipe_ts,
   "channel": pipe_channel_id
}

data_string = json.dumps(data)
print(data_string)
context.updateVariable("pipe_payload",data_string)

In this little bit of Python code, the actual values passed into the “data” JSON structure come from pipeline variables. That “data” Python JSON object is then updated into another pipeline variable, pipe_payload. That final pipeline variable is then referenced in the subsequent Webhook Post component. Using Python, specifically the ability to create the JSON objects, makes this bit simple without overly complicated code.

This is one of the simplest (but useful!) things that one can do in Python. To really tap into the power of Python Pushdown and become a true high-code user, dive into Snowflake’s Snowpark Developer Guide for Python!


Iterating Over Data

In the prior steps of the MatiHelper pipeline, all of the data being moved and transformed could be treated in the same way. MatiHelper first interacts with Slack to get all available messages in a specified channel. Then, any newly captured messages are all submitted to OpenAI with the same general prompt. Along each step of the way, the details of the source of the data and the target destination are the same for every row of data being processed.

Now, in this step of the journey, MatiHelper sends a response back to the originating message in Slack. Ideally, with scalability and performance in mind, data pipelines that work with cloud data platforms such as Snowflake should be designed to work with sets of data, as opposed to individual rows of data. In a perfect world, MatiHelper could send back to Slack all of its responses in one big request. Unfortunately, while the target is Slack for every response, Slack requires that each individual response must be submitted individually. Due to this nuance, it’s at this point in the MatiHelper pipeline where a parent and child orchestration are added into the mix.

In the main MatiHelper pipeline, the main (parent) orchestration pipeline that supports this step of the journey is the “Process Slack AI Replies” pipeline. But, if you open up this pipeline, you will see that it executes another (child) pipeline, named “Reply Slack AI Responses”.

The other thing happening here is the use of a Table Iterator component. Iterator components allow users to loop over rows of data and do something for each record. The thing that is done for each iteration is defined by what the iterator component is attached to. You can attach an Iterator to another Orchestration component, which includes the ability to execute another Orchestration or Transformation pipeline (making it a child pipeline). To learn more about using iterators, see our documentation here!

Typically, when adding iterations to a pipeline like this, multiple steps need to be performed per iteration. This is specifically the scenario where a child orchestration pipeline is introduced. The “child” orchestration represents all of the steps to be performed per iteration. In this case, the Table Iterator is configured to iterate over the “slack_api_replies_new_vw” View that was previously created (see Part 2 of this series for those details). This view returns all captured Slack messages that have an AI generated response and have not yet been sent back to the user.

By using variables, the details for each iteration can be passed to the execution of the child pipeline. For each iteration, the response is first formatted, then submitted to the webhook endpoint and finally marked as sent. Note also that the Table Iterator in the parent pipeline is configured in “sequential” mode, but all Iterator components can also be configured to run in “concurrent” mode. If set here, “concurrent” mode would allow for MatiHelper to send multiple responses simultaneously. To delve deeper into concurrency and how that works in Data Productivity Cloud, see our article on Scaling Best Practices.

Conclusion

So, here concludes Part 5 of this blog series, where I focused on using webhooks as a method of integrating with external systems. I also highlighted the use of Python as a way of formatting a JSON payload and even touched on looping patterns and parent/child pipeline relationships. 

Stay tuned for the final article in this series that will focus on Microbatching for Continuously Running Pipelines!

Explore the rest of the series:

Downloads

You can find the MatiHelper Slack AI App pipelines available for download on the Matillion Exchange here!

Arawan Gajajiva
Arawan Gajajiva

Principal Architect - Sales Engineering COE

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.