Historical data overload? Tips for smooth and reliable large volume data transfers

Many times, analytic insights require large amounts of historical data. Moving these large datasets from their source into a consolidated platform where it can be blended with other data for enrichment and integration can be a lengthy process. The longer a data extract process takes, the more susceptible it is to connection interruptions and other errors that can cause the process to fail, requiring the entire load to be re-extracted. Extracting these datasets can also cause unwanted contention and resource pressure on production systems.

Chunking these datasets into smaller, more manageable pieces not only allows for checkpoints in case of a load failure but also allows for loading each chunk concurrently. This, in turn, reduces the overall process time as well as the time that the pipeline is exposed to errors and interruptions.

Leveraging the parameterization of properties and flow components in Matillion and the Data Productivity Cloud, pipelines can be architected to dynamically partition and extract large datasets at runtime. Segmenting data by date ranges or geographical location is a great way to create small bites of data on fields that are, more than likely, already indexed on the source system. 

The advanced mode setting available on ingest components allows the user to write their own SQL statement for extraction. Incorporating variables into this SQL statement allows the user to dynamically define the where clause to specify a number of smaller chunks to pull from the source. One key aspect of this architecture is to also dynamically define the name of the staging table. By default, Matillion's ingest components will create a new staging table in the target system based on the structure of the source dataset. This allows the pipeline to feed separate structures that can be merged downstream and enable restart-ability on individual chunks in case of failure.

 Placing an iterator over the configured component allows the user to spin through a predefined set of values that define these chunks. Finally, setting the iterator's concurrency to concurrent instead of serial allows the component to extract all of these chunks at the same time, decreasing the overall duration required by generating smaller requests on the source system instead of one large, hard-to-manage request.

Pipeline Example

To implement an example of this architecture, we will build a pipeline with two child pipelines, an orchestration pipeline to ingest our partitioned data, and a transformation pipeline to consolidate our structures. Each data partition will be loaded into a separate staging structure, but the transformation job will pull all the data into a single conformed table. Driving the partitioning will be a load control table created to segment our data and track the loading of each partition. We will create this with an additional pipeline so it is not overwritten with each load.

In our example, we will be loading data from our orders table. To create our load control table we will use a Database Query Component with the following SQL statement:

SELECT count(*) as ROW_COUNT, cast (date_part('year', "ORDER_DATE") as integer) as YEAR_PARTITION, 0 as COMPLETE
FROM demo.store."ORDERS"
GROUP BY date_part('year', "ORDER_DATE")

Resulting in a structure like this:

Our Table Iterator will loop through each record in our table and map our year partition into a variable. This variable will be passed into our child orchestration, where we can then use our variable to dynamically generate SQL to extract from the source. Adding a where clause that is checking the complete flag of this table will enable us to checkpoint the success of each partition and restart the job at the point of failure if one occurs.

The child orchestration has two components, a Database Query Component to stage the partition, and a SQL Script Component to update our control table upon success.

Dynamic Extract SQL in Stage Orders task:

SELECT *
FROM   demo.store."ORDERS"
WHERE date_part('year', "ORDER_DATE") = ${child_year}

UpdateSQL in Update Control Table task:

UPDATE SPLIT_LOAD_CONTROL
SET "complete" = 1
WHERE "year_partition" = ${child_year}

To dynamically create staging structures for each partition, we only need to add the variable to our Target Table property like so:

Once the load is complete we will have a staging order table for each partition. The final step is to invoke a transformation pipeline to merge the data in these structures into our final destination. 

The Multi Table Input Component allows us to define our source tables with wildcards like this:

Once the transformation job is completed, all of our data will be loaded into our TARGET_ORDERS table in Snowflake, ready for our downstream analytics and AI workloads.

Takeaways 

Managing large volumes of historical data for analytics requires strategic planning and robust architecture. By chunking data into smaller, more manageable pieces and leveraging dynamic partitioning, organizations can streamline the data extraction process, minimize resource contention, and ensure reliability even in the face of potential interruptions. Matillion's advanced features, such as parameterization and dynamic SQL statements, facilitate efficient and concurrent data loading, significantly reducing overall transfer times. Implementing an orchestration pipeline for partitioned data ingestion and a transformation pipeline for data consolidation, as illustrated in our example, ensures a smooth and reliable data transfer process. 

Jason Kane
Jason Kane

Sales Architect

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.