Enhance your data productivity with dynamic incremental data pipelines

This blog will demonstrate how to create a dynamic pattern-based data pipeline, which means following the same pattern for many source tables, using Matillion’s Data Productivity Cloud. The pattern shown here is identifying incremental changes from a SQL Server database, staging it into Snowflake and appending the staged data to a raw target table.  However, any repeatable pattern can be done using these basic principles, including data sources such as API calls or files.  

This pattern involves 3 major concepts:  

  1. Creating and maintaining an audit table to keep track of highwater mark timestamps
  2. Using variables and iterators to make your pipelines dynamic 
  3. Moving data from a transient stage table to a persistent raw target table in Snowflake

The foundation to building a dynamic data pipeline is the use of pipeline variables and iteration. This article demonstrates how to define, use and pass variables between pipelines.  It will show how to iterate through variable values and run the same pattern for each variable value.  

This article also demonstrates how to use an audit table to keep track of the latest timestamp to manage identifying incrementals from your source tables. It stores a highwater field to find data that has changed after the last time the data was loaded to Snowflake.  

Part 1: Creating and Maintaining an Audit Table

This pipeline uses an audit table to keep track of the latest timestamp to manage identifying incrementals from your source tables using a highwater mark field.  Using the timestamp saved in the audit table, it will make a call to the source database querying data that has been added or changed since the last load (the value of the stored timestamp).  

Step 1: Create the Audit table

My audit table is created with a batch_id which acts as the primary key of the table and is defined as an INCREMENT field in Snowflake, getting automatically populated each time a row is added to the table.  

Step 2: Initializing the Audit Table 

This is a pipeline that is used to initialize the audit table by pre-populating a row for each table to be pulled from the source database using the incremental timestamp method.  It has a pipeline variable defined for ${pipe_source_table}. The iterator runs the child pipeline for each value set in the fixed iterator. 

Write_Incr_audit_table

This pipeline has a variable defined for each column in the INCR_AUDIT table. The ‘Audit Record’ component is a fixed flow component that uses the defined variables.  

Variables defined with default values:  

Audit Record: Fixed Flow component - define each column in the INCR_AUDIT table and map the variables into each column. 

Part 2: Using Variables and Iterators to create dynamic patterns

As with Part 1, this pipeline starts with defining variables to use with iterators to loop through the variable values and perform the same pattern for each variable value.  In this step, the pipeline is reading from the audit table, querying the source database and writing the incremental data to Snowflake first to a transient stage table and then to a persistent target table.  

Incremental_database_query_using_audit

Step 1: Define your variables:  

Variables are defined with a valid (testable) default value that will be used throughout the pipeline and passed into other pipelines. 

Step 2: Get the max last load timestamp to use in the Source query

Use the ‘Query result to scalar’ component to query the INCR_AUDIT table and store the stg_last_load_ts into the variable ${pipe_stg_last_load_ts}.  Then use a database query component to query the source database where the highwater mark field is >= last load timestamp. 

Query result to scalar:  

 

Database query: 

Step 3: Check for new incremental records before proceeding 

This step uses the ‘Query result to scalar’ component again to query the stage table that was the destination for the previous database query.  If there are 0 rows, it ends the process.  If there are more than 0 rows, it proceeds through the pipeline. 

This step gets the maximum highwater mark from the source database to store as the stg_last_load_ts variable and write to the audit table. 

 

It also gets the last load timestamp and the row count from the load_history table in Snowflake to capture that metadata about the previous staged load into the audit table.  

Step 4: Write to the incr_audit table

The variables set throughout the pipeline are now passed to the ‘write_incr_audit_record’ pipeline that writes the record into the audit table. 

Concurrently, as the audit table is being written, the Orchestration pipeline ‘Dynamic_create_target_tables’ runs. 

Part 3: Moving data from the transient stage table to the persistent target

Dynamic Create Target Tables

If this is the first time through, the target table needs to be created. The Table Metadata to Grid component stores the stage table metadata into a grid variable to be used in the Create Table following.  This component is set to ‘create if not exists’. Afterwards, the ‘Dynamic Source to Target’ transformation pipeline runs.

Dynamic Source to Target

This is the final step to write the contents of the transient stage table to the persistent target table. 

Summary

This article demonstrated how Matillion’s Data Productivity Cloud can make data engineering teams more productive by producing data patterns that can be utilized with many tables in a single data source or many files in a storage bucket.  This pattern-based development technique allows teams to manage one pipeline for each pattern instead of one pipeline for each table, making them more efficient and productive.  

The technique showcased 3 foundational principles: 

  • Using Variables and Iterators to create a repeatable pipeline that will run for each value
  • Storing metadata such as audit information, timestamps, and highwater mark fields to more easily populate those variables 
  • Building and maintaining a raw layer and dynamic, persistent data layer to isolate from data schema changes in the source systems

Using these techniques, data teams can be more productive by building and maintaining fewer pipelines and designing them to be dynamic and resilient to source changes.  This article showcased a common pattern of identifying incremental changes from a source database and moving those changes to persistent tables within Snowflake, while keeping track of key timestamps and load history metadata for pipeline audit purposes.  

Angie Hastings
Angie Hastings

Senior Sales Engineer

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.