- Blog
- 11.28.2024
- Product, Data Fundamentals
How to trigger pipelines via SQS

We know that users want to have different options for invoking their pipelines. Of course, you can schedule a run or trigger manually (via the user interface or the API), but what if we want to make things more automated?
Well, an idea on Matillion's public roadmap has amassed quite a lot of support - so we decided to write this blog to give you some pointers and show you just how easy it is to trigger your Data Productivity Cloud (DPC) pipelines using services in AWS. Of course, the same is possible with other cloud providers, and we’d love to hear if you’re interested in more of these types of blogs!
Before getting started, we must consider a few prerequisites - how we interact with the DPC API and how we want to trigger our pipelines.
Interacting with the Matillion Data Productivity Cloud API
Matillion publishes helpful documentation on interacting with the DPC API. You can find it here, along with a previous blog post on creating custom connector endpoints.
Taking a read of these documents, we know that we need a few things set up to get started:
- First and foremost, we need to set up a Matillion DPC account with a pipeline we want to trigger at https://matillion.com/.
- We must also set up the API credentials for this account, as outlined here.
Triggering Pipelines with External Services
Again, following the previous documentation, there is a helpful example outlining exactly what needs to be done to trigger a pipeline here. This example helps clarify the flow and the requirements to make it work.
However, we want some automation magic, so let us look at AWS's services. Namely:
- Amazon Simple Queue Service (SQS) - https://aws.amazon.com/sqs/
- AWS Lambda - https://aws.amazon.com/lambda/
AWS Simple Queue Service
AWS Simple Queue Service (or SQS) lets us quickly set up a message queue that we can read from AWS Lambda and write from another application. We can choose between a standard or FIFO (first-in, first-out) queue. The difference is that in a standard queue, ordering is not guaranteed, but in a FIFO queue, it is. We can get a message onto the queue from somewhere and then trigger a pipeline using…
AWS Lambda
AWS Lambda is a serverless computing service that allows us to do something in response to a message on the queue with some simple logic. Many languages can be used within Lambda; for this example, we’ll use Python. There are some gotchas that we need to be aware of when it comes to the usage of AWS Lambdas; for this blog, we’ll outline some things to be aware of:
- Permissions
- We must ensure that our Lambda function can communicate with the DPC, the AWS SWS queue itself and the AWS Secret Manager. This is where we will choose to store our private values for authentication.
- You can see the minimum permissions for the Lambda function here.
- Dependencies
- In a Lambda function, we don’t have access to pip to handle our dependencies. Instead, we need to provide these in a zip file to AWS.
- You do this by creating a Layer with the dependencies within it. See the AWS documentation on this here.
- Concurrency and Queue Depth
- We assume this will not be a high-throughput usage activity, so we won’t be too concerned about queue depth or concurrency. However, if you’d like to read more about this, we’re keen to hear and can write another blog!
Step one - set up our AWS SQS FIFO queue
We can easily set up our queue in AWS via the management console. So, let us jump in and create an SQS FIFO (first-in, first-out) queue. This allows us to preserve our message ordering, so we do things in the order they are received:
From the console, we can see that we can set up a Lambda function that will be automatically triggered on receipt of a message. So, let’s get our Lambda function set up!
Step two - implementing our AWS Lambda function
We need our Lambda function to perform a few steps once a message is received, and we need to decide upon a structure for the message that makes sense and provides sufficient information to trigger our pipeline run. Looking at the previously referenced documentation here, we can see that we need the following:
- projectName: The name of the project where the pipeline is located.
- environmentName: The name of the environment in which the pipeline will be executed.
- pipelineName: The name of the pipeline to be triggered.
- scalarVariables: The scalar variables we want to pass to the pipeline we trigger.
We won’t consider grid variables at this stage, but this is a likely sensible extension point. In understanding the message format, we can determine the following functions that the Lambda will perform:
- Receive and Parse SQS Messages: The Lambda function needs to loop through each SQS message it receives and extract the relevant information for execution: the project name, the environment to use, the pipeline to trigger, and any required variables.
- Obtain Access Token from the Data Productivity Cloud: Using the provided client_id and client_secret from setting up the API credentials to access the Data Productivity Cloud, the Lambda function requests an OAuth2 access token from the authentication server, and then this token will be used to authorise API requests.
- Trigger Pipeline in Matillion DPC: The Lambda function constructs a payload based on the SQS message and sends a POST request to Matillion’s DPC API to trigger the pipeline execution in the specified project using the given environment.
- Handle Response: Upon success, the function confirms the pipeline trigger. If an error occurs, we need to be made aware so we can perform further troubleshooting.
Let us look at the Python code we’ll invoke within our Lambda function here. Let’s go section by section to understand what is happening!
Our configuration
We must update the Python our Lambda function will run to contain the required values for our use case. These are:
- MATILLION_REGION may need to be updated to us1 if our account is set up in the US.
- MATILLION_API_SECRET is a dictionary that contains:
- aws-secret-name contains the name of the secret we want our Lambda function to retrieve.
- aws-region contains the AWS region, where our secret is available.
- client_id_key contains the key in our secret where our client ID value is stored. You can leave this as MATILLION_CLIENT_ID if you store your value under this key.
- client_id_secret contains the key in our secret, where our value for the client ID secret is stored. You can leave this as MATILLION_CLIENT_SECRET if you store your value under this key.
- Finally, DPC_PROJECT_NAME_TO_ID_MAP is a dictionary that contains a mapping between a project name and a project ID. You can remove this, but your SQS message must then provide the project ID instead of the project name.
Now, let’s look at the other functions within the Python code to understand what will happen when our Lambda is invoked. Let’s start at the end since it contains the lambda_handler function.
The remainder of the Lambda
The lambda_handler is a standard way to define the function that will be called when the Lambda is invoked. See the AWS documentation on this subject here.
We can see that in this function, we’re looking through the messages we receive from the SQS queue (it’s worth noting that in a FIFO queue, we can receive a batch of messages, so we must loop through these). We, therefore go through each message, get the access token (the call to the get_matillion_access_token function) and send a request to the Data Productivity Cloud to execute our pipeline (the trigger_pipeline function call)
The other functions do what they say on the tin! So:
- get_matillion_oauth retrieves the secret from AWS Secrets Manager so we can securely store the client ID and client secret to interact with the Data Productivity Cloud API.
- get_matillion_access_token uses the OAuth details outlined above and turns this into a bearer token. See step two here.
- trigger_pipeline requests the Data Productivity Cloud to execute the pipeline based on the parameters passed into the function.
Step three - triggering the Lambda from the SQS message
We’ve now arrived at the central section, triggering the pipeline from the SQS message. To do this, we’ve set up two example pipelines:
1.Trigger SQS is a simple pipeline that places a message onto our queue in the format we outlined above. It looks like this:
{
"projectName": "My first project",
"environmentName": "My first project-snowflake",
"pipelineName": "SQS Triggering/2 - Do Work",
"scalarVariables": {
"name": "Johnathan"
}
}
2. Do Work is a more straightforward pipeline that prints our provided variables! In our case, on execution, it will print the value we’ve set in the scalar variable, which is the name.
Now, let’s show this off in a quick video!
In this blog, we've explored how to automate triggering Data Productivity Cloud pipelines using AWS services, specifically Amazon SQS and AWS Lambda. With a simple setup and a few customizations, you can enable seamless automation tailored to your needs.
While this example focuses on AWS, similar approaches can be applied using other cloud providers, offering flexibility and scalability for various architectures. If you found this helpful or have ideas for future topics, we’d love to hear from you. Stay tuned for more insights on optimizing your Matillion experience!
Johnathan Law
Senior Developer Relations Manager
Featured Resources
What Is Massively Parallel Processing (MPP)? How It Powers Modern Cloud Data Platforms
Massively Parallel Processing (often referred to as simply MPP) is the architectural backbone that powers modern cloud data ...
BlogETL and SQL: How They Work Together in Modern Data Integration
Explore how SQL and ETL power modern data workflows, when to use SQL scripts vs ETL tools, and how Matillion blends automation ...
WhitepapersUnlocking Data Productivity: A DataOps Guide for High-performance Data Teams
Download the DataOps White Paper today and start building data pipelines that are scalable, reliable, and built for success.
Share: