- Blog
- 08.22.2024
- Product, Data Fundamentals
How to create a metadata-driven pipeline using Variables

As discussed in part one, Matillion Data Productivity Cloud (DPC) allows for the creation and use of variables in a pipeline.
There are two levels of variables: Project and Pipeline, each with a different scope. Project variables support TEXT and NUMBER, while pipeline variables support TEXT, NUMBER, and GRID.
In this blog, we’ll show you how to create a metadata-driven pipeline using Pipeline variables. By the end of it, there will be one orchestration pipeline and a transformation pipeline. Let’s dive in!
Parameter datatypes in a metadata-driven pipeline
Before you start developing a metadata-driven pipeline using variables, you need to understand the data type, as well as whether it will be SCALAR or GRID.
For our sample pipeline, we’ll incorporate SCALAR and GRID of type TEXT. The goal is to create an incremental SQL Server load using metadata from the SQL Server INFORMATION_CATALOG schema.
The table name for the table for the incremental load is stored in a pipeline SCALAR variable. The required data from the INFORMATION_SCHEMA will be the primary key columns, and because it's a list, column names will be stored in pipeline GRID variables based on the table name pipeline variable.
A pipeline variable is then used to retrieve the last update date from the Snowflake table based on the pipeline table name variable. In addition, a Python script will be used to build the join expression for the table update and store the result in another SCALAR variable.
Now, let’s see how everything factors into our orchestration pipeline.
The orchestration pipeline
This is the command and control logic of the incremental load.
The sample orchestration pipeline we configured uses five variables. Three SCALAR variables, which are:
- pipe_tablename, for storing the table name on which the incremental load will occur. Note you may choose to add a variable for the target schema too
- pipe_last_updated, for storing the max of the last update record in the Snowflake table.
- pipe_sql, which stores the join criteria for the table update component.
And two GRID variables:
- gv_columns, which has the list of columns.
- gv_pk, which is used to store the primary keys for the table on which the incremental load will occur.
Default values can be set to populate the variables, or the pipeline can use components within the pipeline to set the values. In this case, the pipe_tablename is set using a default value: CUSTOMERS.
The Database Query component performs a database query directly against a SQL Server database to extract metadata about the columns in the target table. The query refers to the sqlserver_columns table to retrieve metadata like column names and types.
The query targets the sqlserver_columns table in the default schema and Snowflake database. Here’s a sample query to get columns using the ${pipe_tablename} variable:
SELECT c.COLUMN_NAME COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLES t
INNER JOIN INFORMATION_SCHEMA.COLUMNS c
ON t.TABLE_NAME=c.TABLE_NAME
AND t.table_schema = c.table_schema
WHERE t.table_schema = 'store' AND t.table_name IN ('${pipe_tablename}')
The Query Result to Grid function takes the results of a database query on the sqlserver_columns table in Snowflake and loads the COLUMN_NAME column into a GRID variable called gv_columns. The gv_columns variable will now contain the table's column names.
Now that the variables are populated, a test for the existence of the table in Snowflake is performed.
The Assert Table component validates that a table with the name stored in the ${pipe_tablename} variable exists in the default Snowflake schema and database. It will check that the row count matches the Equal To condition with an empty row count value.
If the validation succeeds, it will transition to the Database Query SQLSERVER PK component to begin the incremental load.
If it fails, it will transition to Database Query SQLSERVER instead to perform a full load of the table. If the table doesn’t exist, the bottom branch of the pipeline is taken, and a full load of the table is performed.
If the table exists, then the top branch of the pipeline is taken to perform an incremental load of the data. We’ll detail the steps in the top branch below.
Step 1
The first step in performing an incremental load of data is to get the primary keys for the table. The Database Query SQLSERVER PK component queries the SQL Server database to retrieve the primary key columns for the target table.
The query results are stored in the sqlserver_pk_columns table in Snowflake. This will retrieve the primary key information needed for the incremental load process.
The component uses Advanced mode to enable the use of customer SQL. Here’s some sample code with the ${pipe_tablename} variable:
SELECT c.COLUMN_NAME COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLES t
JOIN INFORMATION_SCHEMA.COLUMNS c
ON t.TABLE_NAME=c.TABLE_NAME
AND t.table_schema = c.table_schema
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
ON c.column_name = kcu.COLUMN_NAME AND c.TABLE_NAME = kcu.TABLE_NAME
AND kcu.table_schema = c.table_schema
JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
ON kcu.TABLE_NAME = tc.TABLE_NAME AND tc.CONSTRAINT_TYPE='PRIMARY KEY'
AND kcu.table_schema = tc.table_schema
WHERE t.table_schema = 'store' AND t.table_name = ('${pipe_tablename}')
Step 2
The second step is to get the last_updated date. The Query Result To Scalar Get max last_updated component runs a SQL query to retrieve the maximum value from the LAST_UPDATED column in the Snowflake table using the advanced mode with a custom query using the ${pipe_tablename} variable:
SELECT MAX(LAST_UPDATED) LAST_UPDATED FROM "<your schema>"."<your database>"."${pipe_tablename}"
The result is mapped to the ${pipe_last_updated} SCALAR variable.
Step 3
The third step is to populate a GRID variable with the primary keys. The Query Result To Grid SQL SERVER PK component queries the primary key columns from the Snowflake database table and loads the results into a GRID variable called gv_pk.
The query then retrieves the COLUMN_NAME from the sqlserver_pk_columns table. With that, the gv_pk will now contain the primary keys for the table.
Step 4
The fourth step is to build the join expression for the table update component in the sqlserverincr_merge transformation pipeline. The Python Script Create JOIN for Table Update component runs a Python script that will create the JOIN statement needed for the incremental update of the target table.
The script takes in the primary key columns from the gv_pk grid variable and the source columns from gv_columns to generate the JOIN statement stored in the ${pipe_sql} variable.
Here’s some sample code:
source_list = context.getGridVariable('gv_pk')
cnt=len(source_list)
loop_cnt=1
where=''
for x in source_list:
where += "\"TARGET\".\"{}\"=\"INPUT\".\"{}\" ".format(x[0], x[0])
if cnt > 1 and loop_cnt < cnt:
where += ' AND '
loop_cnt +=1
context.updateVariable('pipe_sql', where)
Step 5
The fifth step is to stage the data based on the CUSTOMERS table. The Database Query Stage Incremental Rows component queries the source Microsoft SQL Server database to retrieve incremental rows from the source tables that were updated since the last run time.
The rows are filtered based on the LAST_UPDATED column being greater than the value stored in the pipe_last_updated variable. The retrieved rows are then loaded into the staging table defined by the ${pipe_tablename}_STG variable.
Step 6
The sixth step is the transformation pipeline. The sqlserverincr_merge component runs the SQLServerINCR/sqlserverincr_merge transformation pipeline. This transformation pipeline performs an incremental merge of new or updated rows from the staging table into the target table.
It uses the primary key columns defined in the gv_pk grid variable to identify rows for update, insert, or delete and perform the corresponding DML operation on the target table.
In order to use the variables populated in the orchestration pipeline, the variables need to be defined in the transformation pipeline as well. In the transformation pipeline, there are two SCALAR variables:
- pipe_tablename, which is used to store the table name on which the incremental load will occur.
- pipe_sql, which stores the join criteria for the table update component.
And one GRID variable, gv_columns, which has the list of columns for the table on which the incremental load will occur. To pass the variables, you set the SCALAR and GRID variables in the properties of the transformation pipeline component as shown below:
How to set the SCALAR variables
How to set the GRID variables
The transformation pipeline
Here’s a more in-depth look at the transformation pipeline on the sixth step.
Step 1
The first step in the transformation pipeline is to use a Table Input component. The Table Input component reads data from a source database table specified by the $[pipe_tablename} variable parameter with the addition of _STG.
It uses the column mappings defined in the gv_columns variable to map source columns to the staging table. No transformation is performed on the data during this stage.
Step 2
The second step is to perform the table update for the incremental load. The Table Update component takes the data from the Table Input source and updates the target table specified in the ${pipe_tablename} variable.
It will join the source and target tables on the expression defined in ${pipe_sql} and update the target table columns mapped in gv_columns where there is a match. Any rows in the source that do not match will be inserted into the target table based on the mappings.
This allows the pipeline to upsert data from the staging table into the target table in the warehouse based on the primary key.
Conclusion
Congratulations! You have created a flexible, generic incremental load configured by parameterizing Data Productivity Cloud pipelines with scalar and grid variables.
Matillion ETL users may be interested in viewing the same techniques in this article on How to use Matillion ETL Grid Variables to Incrementally Load Insurance Claim Data.
Mike Terrell
Sales Engineer
Featured Resources
ETL and SQL: How They Work Together in Modern Data Integration
Explore how SQL and ETL power modern data workflows, when to use SQL scripts vs ETL tools, and how Matillion blends automation ...
Learn more WhitepapersUnlocking Data Productivity: A DataOps Guide for High-performance Data Teams
Download the DataOps White Paper today and start building data pipelines that are scalable, reliable, and built for success.
Learn more BlogWebhooks and Pushdown Python: Building Interactive and Efficient Data Applications
Part 5 of our blog series demonstrating the art of the possible, using Matillion products and features to build the MatiHelper ...
Learn more
Share: