Integrating with Slack - Fetching Messages and Implementing Incremental Loading

Welcome to Part 3 of this blog series, demonstrating the art of the possible, using Matillion products and features to build the MatiHelper Slack AI app. In this series, we’re taking a deep dive into how the MatiHelper Slack AI app was built, starting with design and now focusing on pipeline development. If you haven’t already seen MatiHelper in action, here’s that video to help set the stage!

In the first part of this series, I discussed how to approach designing data pipelines by highlighting design concepts that can be used to build simple and scalable pipelines. Part two focused on the topic of the data lifecycle and how it defines the journey of the data. I also showed what the data lifecycle looks like for the MatiHelper Slack AI app. And, designed a tracking table and ancillary views that show the MatiHelper data at different points throughout its lifecycle.  The next few articles will provide a detailed look into building Data Productivity Cloud pipelines that usher data through the MatiHelper Slack AI data lifecycle. 

Flex Connectors

The first step in the MatiHelper data journey is to retrieve any new messages posted by users in a Slack channel. As such, this will be the first and primary focus of this data pipeline. 

Data Productivity Cloud provides users with various types of components to help build data pipelines. When building a data pipeline, one typically begins by identifying sources of data and evaluating how to best integrate with those sources. As can be seen in the MatiHelper data lifecycle diagram, it needs to start by integrating with Slack. As is common for most modern applications, Slack provides REST APIs as a method of allowing external applications and processes to integrate with the Slack platform. 

Among the various types of components that Data Productivity Cloud provides, Load components, which ingest data from external source systems, is where one typically starts the development of a data pipeline. The native Load components available in the Data Productivity cloud connect to associated source systems in various ways. While many of the native Load components provided are based on integrations with REST APIs for that source system, there is not a native Slack component today. However, the lack of a native load component for Slack is not a blocker to building this particular data pipeline. In fact, this helps to highlight a powerful feature of the Data Productivity Cloud platform, Custom Connector. Or, more specifically, Flex Connectors which are a type of Custom Connector. 

Custom Connector gives users the ability to create their own custom Load component based on an integration with a REST API. Users create their own Custom Connector via an easy-to-use wizard interface. This helps to fill the gap for those source systems where a native component may not exist but the source system has a REST API that can be used as an integration point. For a more in-depth overview of Custom Connector, have a look at my other blog, Matillion patterns for working with complex JSON data from an API Source.

Flex Connectors are Custom Connectors that Matillion has pre-built for a curated set of third-party endpoints and made available to be used immediately. In our case, we are using the Slack Flex Connector as the method of capturing new messages posted to a Slack channel.

Get Slack Queries Pipeline

In the previous blog article, I started by creating an initialization pipeline, which creates the MatiHelper tracking table and associated views. This next pipeline, named Get Slack Queries, will now focus on getting new messages posted to a Slack channel and inserting that data into the MatiHelper tracking table. 

The following sections of this article focus on some important aspects of the Get Slack Queries pipeline. However, I won’t touch upon every detail of the pipeline. To see all of the details of this and other MatiHelper pipelines, head on over to the Matillion Exchange, where the full set of MatiHelper Slack AI pipelines are available for download!

Slack Flex Connector

One of the key roles of this pipeline is to integrate with Slack, so as to fetch any messages posted to a Slack Channel. Matillion Flex Connectors provide pre-built integrations to some, but not all, API endpoints for curated third-party endpoints. The pre-built integrations typically represent the most commonly used API endpoints for that source. If a required API endpoint is not one of the included pre-built ones, you can convert the Flex Connector into a standard Custom Connector, and extend it further by adding additional API endpoints. When extending a Flex Connector to add additional endpoints, the pre-built endpoints can be used as examples of how to configure the new endpoints.  

So, the first thing we need to do is determine if the pre-built endpoints in the Slack Flex Connector include the one that suits our use case. As I highlight how to use the Slack Flex Connector, there are some nuances specific to Slack. However, the general approach to configuring the Slack Flex Connector is representative of using any Matillion Flex Connector.

When using a Flex Connector, a good place to start is to review the API documentation provided by the third-party source. In this case, we are looking at Slack’s Web API endpoints. From this documentation, we’ve identified the conversations.history API as the Slack API that allows us to fetch messages (conversations) posted to a Slack channel.

To review the available endpoints in the Slack Flex Connector, we first need to navigate to the Matillion Custom Connector area of Data Productivity Cloud. From your Hub Account’s start page, click on the menu icon on the top left and select Manage → Custom Connectors. This will bring up the Manage Custom Connectors dashboard, which lists any Custom Connectors that exist for your Hub Account. Under the section showing Custom Connectors, there is the Flex Connector Library, which is what we are looking for. In the Flex Connector Library, click on the Slack icon, which now brings us to the details of the Slack Flex Connector. 

In reviewing the available endpoints in the Slack Flex Connector, we can see that the last endpoint in the list, Fetch a Conversation’s History, is using the same conversations.history API endpoint that we need.  

Now that we’ve identified that the Slack Flex Connector includes the API endpoint we need, we need to understand how to make it work. The Usage Info section in Slack’s API documentation highlights two important areas around using this API.

  1. There is mention of the acceptable ways to authenticate when using the API. Specifically, there is a table of Token types that can be used with this API endpoint and the required scopes for this particular API endpoint. In this case, we’ve elected to use an app token.
  2. There is an oldest and latest parameter that can be set when calling the API. If neither are specified, the API returns all messages. We want our interactions with Slack to be as lightweight and quick as possible, so we want to only fetch messages posted since the last time we fetched messages from the Slack channel. This is commonly thought of as an incremental load pattern, which was also discussed in Part 1 of this blog series.

 

Authentication: app token

When configuring a Flex Connector, the first thing to tackle is configuring any dependencies around authentication. To set up a Slack app token to be used with the Slack Flex connector, there is this Slack tutorial on How to quickly get and use a Slack API app token. Once the setup is complete, we now have a Slack app token that needs to be added as a new Secret Definition in the Data Productivity Cloud Project being used. To set up a Secret Definition, follow the steps outlined in our documentation that aligns with your operating model (Matillion Full SaaS or Hybrid SaaS).

Finding the Slack Channel ID

As highlighted in documentation, the conversations.history API endpoint has 2 required inputs; a token and a channel. We’ve already set up the token, so next we need to get the required channel value. The channel value that the conversations.history API is looking for is an internal ID that uniquely identifies a Slack channel. The easiest way to get this Channel ID value is via your Slack Client (web browser or Slack Desktop app). In your Slack Client, right-click on the Slack Channel you wish to use and select “View channel details”. This will bring up a dialog that contains the details for that Slack channel. At the bottom of this dialog, you can find the Channel ID value, which is the other required value that will be used with the Slack Flex Connector. 

Testing the Slack Flex Connector

Now that I have a Slack token for authentication and a Slack channel ID to work with, the next step is to test the Slack Flex Connector, confirming it returns the data we expect. To test the Slack Flex Connector, navigate back to Slack Flex Connector within Matillion Custom Connector and select the Fetch a Conversation’s History endpoint. 

Previously, I looked at the Slack Flex Connector to verify the conversations.history API endpoint is in the list of pre-built endpoints. Now, I use the same dialog to test the Slack Flex Connector. Under the Authentication tab, the Slack Flex Connector is already configured with Bearer Token as the Authentication Type. In the Token box, paste in the Slack token provisioned previously.

Next, click on the Parameters tab, where there is a channel query parameter already defined. In the Value box, provide the Slack channel ID for the Slack channel identified previously. Note that the value provided will also be appended to the API URL that is being built.

For this API endpoint, the Authentication and Slack Channel ID were the only requirements to execute the API. So, I’m now ready to click on the Send button, which will execute the API call and show the results.

The main point of interest here is in the Raw response and Structure. The Raw Response section shows the actual response from the API endpoint. By inspecting the response data, the Structure section provides metadata about the API response, highlighting structure and data types. Slack’s API documentation provides example responses to validate against. When testing the Slack Flex Connector, I should see a similar response as what is detailed in Slack’s documentation, which is the case here.

If there are any failures in testing a Flex Connector, the Raw response will typically provide an error message from the API endpoint, which will inform as to the issue that caused the error.

Using the Slack Flex Connector

After successfully testing the Slack Flex Connector, it’s time to create a new orchestration pipeline that will handle this first part of the MatiHelper data lifecycle. At the core of this pipeline is the Slack Flex Connector, so this will be the first component added to the new orchestration pipeline. All Flex Connectors are available as components to be used in orchestration pipelines. To find the Slack Flex Connector, search for “Slack” to find the specific Slack Flex Connector, or search for “Source” to see all available Flex Connectors (Flex Connectors are categorized as “Source” type connectors). 

Once a Slack connector has been added to the orchestration pipeline, I’ve configured it as follows:

Connect

This section of the component configuration defines the details of how to connect to a Slack endpoint. In the Data Source drop-down, choose Fetch a Conversation’s History. For Authentication Type, choose Bearer token, which should be the only option. And, for the Token, choose the Secret Definition created earlier when storing the Slack Token.

Configure

This section of the component allows for the configuration of any parameters or inputs that are to be included in the API call. 
In the case of the conversation.history Slack API, the Channel ID must be provided as a Query Parameter. So as to make this pipeline reusable (not specific to a particular Slack Channel), instead of hardcoding a Slack Channel ID value here, I’ve set the parameter value to reference a pipeline variable, ${pipe_channel_id}. When using variables like this, it’s best practice to initially set the default value to a real value (the MatiHelper Slack Channel ID in this case) and replace it with a dummy default value when deploying the pipeline. Ultimately, the Slack Channel ID value will be a value that is passed in at execution time.

Destination

The Destination section of the component defines where the API response should be stored. All Flex Connectors allow for the destination to be a file in cloud storage (AWS S3 or Azure Blob Storage) or a cloud data platform (Snowflake, Redshift, Databricks). This pipeline has been configured to load the API response data into a Snowflake staging table named stg_slack_matihelper_queries. By setting the Load Strategy to Replace, there is no need to pre-create this table as the connector will drop and recreate it upon every execution. 

Advanced Settings

The Advanced Settings section allows for more detailed logging when executing the component. This can be configured via the Log Level property, which is particularly helpful for debugging unexpected behavior and can provide insight into the actual API calls executed by the component. While this can be helpful when developing a pipeline, so as to reduce logging overhead, ensure to set this back to the default level of Error when deploying the pipeline for real.

Processing Slack Messages

With the Slack Flex Connector now configured, it’s time to run the component. This serves multiple purposes. First, running the component will confirm that the configuration is correct. Second, by running the component, the staging table will be created and populated with data. This allows us to develop the next part of this pipeline, which processes the data fetched from Slack.

A typical sequence of events in a data pipeline is to first fetch the data from a source system and then transform that raw data to make it more usable. In typical business analytics workloads, that transformation step often involves applying business rules, aggregating metrics or transforming the data into more analytics friendly structures, like fact and dimension tables. 

In the case of the MatiHelper data lifecycle, this data pipeline needs to flatten the API response data, which in its raw state is semi-structured data stored in a JSON object. Additionally, the pipeline needs to add metadata before ultimately adding the new data to the slack_ai_queries tracking table.

Transformation Pipeline Overview

Here’s a look at the transformation pipeline that processes the data fetched from the Slack API.

The purpose of this transformation pipeline is to take the raw incoming data from Slack, which looks like this:

This data is then flattened and metadata is added, so that the data can then be inserted into the primary slack_ai_queries tracking table.

Sampling Data

The previous two screenshots show the data in this pipeline in its initial raw state and its final flattened state. Both of these screenshots came directly from the Data Productivity Cloud Designer UI. A really helpful feature that the Designer interface provides is the ability to sample the data in the pipeline as it’s being developed. This allows for quick validation of data transformation logic and ultimately leads to faster and more efficient pipeline development.

Transformation Components

In this section, I’m highlighting a couple of powerful transformation components used in this transformation pipeline.

Extract Nested Data

With the proliferation of software as a service “SaaS” platforms, REST APIs have flourished as a means of integrating with third-party platforms. As a general standard, most REST APIs commonly use JSON (JavaScript Object Notation) as a standard data format. JSON is lightweight and readable, language-independent and can easily represent structured data. With all these benefits, working with JSON and its semi-structured nature can cause data engineers headaches. This is where the Extract Nested Data component comes into play. This component provides a way of visually defining semi-structured data, like JSON objects and arrays, and flattens them to a specified level of granularity. This component is often one of the first transformation components used when loading data using a Custom Connector or Flex Connector.

Calculator

The Calculator component is a powerful and flexible component. Common uses of this component are to apply business logic to data and to add metadata to a dataset. This component taps into the native and user-defined functions available in the cloud data platform being used. 

In this particular transformation pipeline, the Calculator component is used to add important tracking metadata to the Slack dataset. In the incoming raw JSON data from Slack, there is a field named “ts” that is present for every Slack message contained in the response data. This field represents the timestamp for when the Slack message was created. The value of the “ts” field is an integer, or more specifically it is a unix timestamp (or Unix epoch time). A unix timestamp value represents the number of seconds elapsed since January 1, 1970 00:00:00 UTC. But, this value can also represent a higher level of granularity, such as milliseconds or microseconds. The level of granularity can usually be determined by the number of digits in the value. 

The first use of the Calculator component in this pipeline is to convert the unix timestamp value stored in the “ts” field, which has been renamed to msg_create_ts, into a standard DateTime format. In this case, the conversion is done using Snowflake’s to_timestamp() function. By naming the expression the same as the input field (msg_create_ts), the effect of this expression is to transform the data in this column at this point in the pipeline.

There is also an expression named msg_receipt_ts. As this is not the name of an incoming field, it becomes a new field output from the Calculator component. Snowflake’s current_timestamp() function is used here, with the purpose being to have a record of the date and time when the Slack message was ingested from Slack.

Pro tip: Try using the Copilot feature available in the Calculator component! This AI-backed feature translates a natural language request into the cloud data platform’s Calculator Expression syntax!

Incremental Data Loading

As discussed in the first article in this series, incremental loading is a very common pattern when getting data from a source system. The premise of the pattern is that for every data load from the source system, the pipeline first determines the most current data that has been loaded previously and fetches from the source everything since the prior load. When implementing an incremental data loading pattern, only new or changed data is fetched, which leads to better performance of the data pipeline. Typically, databases are the most common source system where this pattern is used, however, the pattern can be applied to any source system that has a field that can be used as a high watermark. The source system must also provide the ability to fetch new data based on that high watermark field’s value. 

Getting the High Watermark Value

In the Get Slack Queries Pipeline, the very first component, named Get Max Message Timestamp, uses a Query Result to Scalar component to get a high watermark value and sets the value of a pipeline variable, ${pipe_max_ts}, with the returned value. Here is the SQL query used at this step in the pipeline:

SELECT COALESCE("max_msg_ts",DATE_PART(epoch_second, current_timestamp() - INTERVAL '10 minutes')) as "MAX_TS" 
FROM "slack_api_queries_max_msg_ts_vw"

There are a couple things to highlight about this SQL query. The first thing to highlight is that this SQL query is selecting from a view and not a table. The view, named slack_api_queries_max_msg_ts_vw, was created as part of the initialization pipeline, which is one of the Views highlighted in the previous blog in this series on The Data Lifecycle. The second thing to highlight is the syntax of the SELECT portion of the SQL:

COALESCE("max_msg_ts",DATE_PART(epoch_second, current_timestamp() - INTERVAL '10 minutes'))

The COALESCE function used here is commonly used in most database and data warehouse platforms. It is used to account for the scenario where a value might be null. In this case, this is helping to account for the scenario where the MatiHelper pipeline is being run for the first time against a Slack channel. In that scenario, there is no high watermark value, as there have been no Slack messages loaded yet. In this scenario, the ${pipe_max_ts} variable is set with a value that represents “10 minutes ago”. The Slack API requires a unix timestamp formatted value, so the DATE_PART function is used to convert the timestamp into epoch_second format. While this SQL uses Snowflake-specific syntax, this can be easily converted to work on other supported Data Warehouse Platforms, such as Databricks or Redshift.

Incremental Data Loading from a Slack Channel

The Slack conversations.history API endpoint has an optional query parameter named oldest. Using this parameter in the API call will limit the results to any messages that were created after the time defined by the provided value.  This is exactly what is needed to allow for incremental data loading from a Slack Channel. As this is an optional parameter, it isn’t defined in the Slack Flex Connector. The conversations.history API has just one required query parameter, channel. When configuring the Slack connector, “channel” will appear in the parameter drop-down because it is required (and defined in the Slack Flex Connector). For the optional “oldest” parameter, the parameter name can be manually typed in and it will be added to the API call that is executed. Here, you can see both query parameters as they are defined in the component, using the related pipeline variables to dynamically define the actual value at execution time.

Conclusion

So, here concludes Part 3 of this blog series, where I focused on integrating with Slack to get messages posted to a Slack channel. This pipeline helps to demonstrate the power of Flex Connectors and provides an example of using an incremental data loading pattern in a data pipeline!

Here’s a quick peek into the upcoming parts in this blog series and catch up on previous ones you may have missed!

Downloads

You can find the MatiHelper Slack AI App pipelines available for download on the Matillion Exchange here!

Arawan Gajajiva
Arawan Gajajiva

Principal Architect - Sales Engineering COE

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.