- Blog
- 07.29.2024
- Product, Data Fundamentals
Enhance your data productivity with dynamic incremental data pipelines

This blog will demonstrate how to create a dynamic pattern-based data pipeline, which means following the same pattern for many source tables, using Matillion’s Data Productivity Cloud. The pattern shown here is identifying incremental changes from a SQL Server database, staging it into Snowflake and appending the staged data to a raw target table. However, any repeatable pattern can be done using these basic principles, including data sources such as API calls or files.
This pattern involves 3 major concepts:
- Creating and maintaining an audit table to keep track of highwater mark timestamps
- Using variables and iterators to make your pipelines dynamic
- Moving data from a transient stage table to a persistent raw target table in Snowflake
The foundation to building a dynamic data pipeline is the use of pipeline variables and iteration. This article demonstrates how to define, use and pass variables between pipelines. It will show how to iterate through variable values and run the same pattern for each variable value.
This article also demonstrates how to use an audit table to keep track of the latest timestamp to manage identifying incrementals from your source tables. It stores a highwater field to find data that has changed after the last time the data was loaded to Snowflake.
Part 1: Creating and Maintaining an Audit Table
This pipeline uses an audit table to keep track of the latest timestamp to manage identifying incrementals from your source tables using a highwater mark field. Using the timestamp saved in the audit table, it will make a call to the source database querying data that has been added or changed since the last load (the value of the stored timestamp).
Step 1: Create the Audit table
My audit table is created with a batch_id which acts as the primary key of the table and is defined as an INCREMENT field in Snowflake, getting automatically populated each time a row is added to the table.
Step 2: Initializing the Audit Table
This is a pipeline that is used to initialize the audit table by pre-populating a row for each table to be pulled from the source database using the incremental timestamp method. It has a pipeline variable defined for ${pipe_source_table}. The iterator runs the child pipeline for each value set in the fixed iterator.
Write_Incr_audit_table
This pipeline has a variable defined for each column in the INCR_AUDIT table. The ‘Audit Record’ component is a fixed flow component that uses the defined variables.
Variables defined with default values:
Audit Record: Fixed Flow component - define each column in the INCR_AUDIT table and map the variables into each column.
Part 2: Using Variables and Iterators to create dynamic patterns
As with Part 1, this pipeline starts with defining variables to use with iterators to loop through the variable values and perform the same pattern for each variable value. In this step, the pipeline is reading from the audit table, querying the source database and writing the incremental data to Snowflake first to a transient stage table and then to a persistent target table.
Incremental_database_query_using_audit
Step 1: Define your variables:
Variables are defined with a valid (testable) default value that will be used throughout the pipeline and passed into other pipelines.
Step 2: Get the max last load timestamp to use in the Source query
Use the ‘Query result to scalar’ component to query the INCR_AUDIT table and store the stg_last_load_ts into the variable ${pipe_stg_last_load_ts}. Then use a database query component to query the source database where the highwater mark field is >= last load timestamp.
Query result to scalar:

Database query:

Step 3: Check for new incremental records before proceeding
This step uses the ‘Query result to scalar’ component again to query the stage table that was the destination for the previous database query. If there are 0 rows, it ends the process. If there are more than 0 rows, it proceeds through the pipeline.
This step gets the maximum highwater mark from the source database to store as the stg_last_load_ts variable and write to the audit table.

It also gets the last load timestamp and the row count from the load_history table in Snowflake to capture that metadata about the previous staged load into the audit table.

Step 4: Write to the incr_audit table
The variables set throughout the pipeline are now passed to the ‘write_incr_audit_record’ pipeline that writes the record into the audit table.
Concurrently, as the audit table is being written, the Orchestration pipeline ‘Dynamic_create_target_tables’ runs.
Part 3: Moving data from the transient stage table to the persistent target
Dynamic Create Target Tables
If this is the first time through, the target table needs to be created. The Table Metadata to Grid component stores the stage table metadata into a grid variable to be used in the Create Table following. This component is set to ‘create if not exists’. Afterwards, the ‘Dynamic Source to Target’ transformation pipeline runs.
Dynamic Source to Target
This is the final step to write the contents of the transient stage table to the persistent target table.
Summary
This article demonstrated how Matillion’s Data Productivity Cloud can make data engineering teams more productive by producing data patterns that can be utilized with many tables in a single data source or many files in a storage bucket. This pattern-based development technique allows teams to manage one pipeline for each pattern instead of one pipeline for each table, making them more efficient and productive.
The technique showcased 3 foundational principles:
- Using Variables and Iterators to create a repeatable pipeline that will run for each value
- Storing metadata such as audit information, timestamps, and highwater mark fields to more easily populate those variables
- Building and maintaining a raw layer and dynamic, persistent data layer to isolate from data schema changes in the source systems
Using these techniques, data teams can be more productive by building and maintaining fewer pipelines and designing them to be dynamic and resilient to source changes. This article showcased a common pattern of identifying incremental changes from a source database and moving those changes to persistent tables within Snowflake, while keeping track of key timestamps and load history metadata for pipeline audit purposes.
Angie Hastings
Senior Sales Engineer
Featured Resources
What Is Massively Parallel Processing (MPP)? How It Powers Modern Cloud Data Platforms
Massively Parallel Processing (often referred to as simply MPP) is the architectural backbone that powers modern cloud data ...
BlogETL and SQL: How They Work Together in Modern Data Integration
Explore how SQL and ETL power modern data workflows, when to use SQL scripts vs ETL tools, and how Matillion blends automation ...
WhitepapersUnlocking Data Productivity: A DataOps Guide for High-performance Data Teams
Download the DataOps White Paper today and start building data pipelines that are scalable, reliable, and built for success.
Share: