Visit Matillion AI Playground at Snowflake Data Cloud Summit 24

Find out more

Schema Drift and Dynamic S3 Files

This article is part 2 of a three-part series on schema drift demonstrating how to design data pipelines using Matillion Data Productivity Cloud when the data source format changes. It will show how to design data pipelines for two scenarios: when you have many different file formats you need to process dynamically or when your S3 files change format.

The design pattern is for Snowflake. It requires that your S3 files be in CSV format, using any delimiter, and have a header line defining the data columns in the file.

Dynamic S3 File Loading

To load data from an S3 file into Snowflake, you need to create a table first. This is challenging when you have a large number of files with different schemas or file schemas that are changing. This design pattern is a dynamic pipeline for processing large numbers of files. 

The dynamic S3 file design pattern has 4 steps to it.  

  1. Create a custom file format and a table with a single data column
  2. Parse the S3 file header and create a new table with the correct file schema
  3. Load the S3 file into the new stage table
  4. Compare the stage and target tables to identify metadata drift
Step 1:  Create a custom file format and raw data table
  • The first component creates a custom file format which will define the field delimiter as null.  Defining the file format with a null delimiter will cause all data to be loaded into a single column.  If the file delimiter is not a comma, you may need to create another file format with the correct delimiter.  
  • Create the first landing table to parse the header (s3_dynamic_header).  This table will contain the single varchar column with the maximum length (16Mb).  This table is used to stage the S3 file and extract the header row.
Step 2: Parse the S3 file header and create a new table with the file schema
  • Truncate the dynamic header table—since this dynamic table is used for every file, this ensures that the current file being processed is the one staged into the dynamic table to extract the header. 
  • Load the S3 file into the dynamic table created in the first step. This should load all of the data into a single column without a delimiter.
  • Run a child pipeline, ‘023_create_s3_drift_table’, that parses the header and captures the S3 file metadata. 
Step 2a: 023_create_s3_drift_table
  • The first component queries the dynamic table and writes only the header row to a new table. 
  • Then, a child transformation called ‘split_file_header’ parses the header and rotates the columns to rows to build the metadata defining the final stage table for the S3 file.  
  • The query result to the grid is querying the table that contains the metadata from the split header and storing it in a grid variable to be used when defining the new table. 
  • Create csv_file_table creates a new table with all the columns from the header and the metadata that was created on the split_file_header transformation pipeline. 
Step 2b: split_file_header
  • In the child pipeline ‘split_file_header’, the header, now stored in a new table, is parsed with a split() function in Snowflake. This component is an SQL component that utilizes the split function. 
  • The SQL component has the following SQL:  

select SPLIT("header", '${pipe_custom_delimiter}') as split_header

from "${proj_default_schema}"."${pipe_dynamic_header_table}"

This query will result in an array of column names.  Example: 

[ "First_Name", "Last_Name", "Street Address", "City", "State", "Country", "Postal code" ]

  • Flatten variant - this component will turn the array of column names into different rows:
  • Add table metadata is a calculator component that creates metadata for each column.  This component makes every column a varchar (16777816). The expressions correspond to the metadata that is required to create a table.  These are the same fields that are in the grid variable.
  • Using the grid variable and metadata defined, create the stage table for the S3 file. 
Step 3:  Load the S3 file into the newly defined stage table
  • Query result to grid queries the newly created S3 stage table to get the columns. 
  • Then truncates the S3 stage table to make sure the new file is loading to a clean table. 
  • It loads the S3 file into the stage table, using the custom file format if the delimiter is different from a comma.  
  • Next, the new table is checked to ensure some data is loaded before proceeding.  If the row count is 0, then the job ends in a failure.  If the row count is greater than zero, this pipeline ends and is passed to the next check if the metadata is the same as the previous run. 

Identifying Schema Drift

Now the data has been loaded, it's time to check for schema drift.

Step 4:  Identify schema / metadata drift or run the S3 stage to target transformation 
  • The first step queries the S3 stage table and then saves the schema into a grid variable.  
  • Next, it checks to see if the target table exists.  If it doesn’t exist, it runs the ‘046_s3_stage_to_target’ for the first time and ends successfully.  
  • If this S3 file has been processed before or the pattern is to process the same S3 file pattern daily, for instance, then the target table schema is queried and saved to a grid variable. 
  • The last step in this process is to run the child pipeline ‘042_flag_or_run_stage_to_target’. 
Step 4a:  046_s3_stage_to_target

The dynamic S3 file pattern will handle changing schemas until it is time for the stage data to be written to the target table.  Typically, these stage-to-target transformation pipelines will contain business logic and transform the data to a target model purpose-built for an analytic use case. 

Step 4b: 042_flag_or_run_stage_to_target ]

For this step, we can identify if there has been a change to the stage table and exit the pipeline gracefully with a notification that a change is required. 

  • Run 045_compare_source_target_metadata
  • Count rows from new_columns_metadata - queries the new_columns_metadata table for a row count and saves the result into pipe_changes_needed variable
  • Count rows from delete_columns_metadata- queries the delete_columns_metadata table for a row count and saves the result into pipe_changes_needed variable
  • Changes? Checks the pipe_changes_needed variable for a non-zero value.  If there are changes, send an alert to a webhook post, that indicates a change is needed and fails the job to raise the alarm that the process did not complete all the way to the target. If there are no changes, then the pipeline proceeds to run the 046_s3_stage_to_target.   
Step 4c:  Run 045_compare_source_target_metadata
  • Compares the stage and target metadata tables to determine changes
  • New and changed columns are written to the new_columns_metadata table
  • Deleted columns are written to the delete_columns_metadata table

Summary

Schema drift is a critical challenge in data management, and it manifests particularly when dealing with CSV files in cloud storage like Amazon S3. The article has demonstrated a way to detect and manage schema drift, focusing on handling files stored in S3 with potentially varied and evolving file formats. To address the challenge, this article has outlined a design pattern that supports dynamic S3 file loading.

Identifying schema drift within this system involves several steps focused on dynamic file handling and comparison of schemas. Initially, a custom file format treats all incoming data as belonging to a single column, irrespective of actual data delimiters. This raw data populates a preliminary staging table, which helps isolate the header for detailed schema extraction. Using child pipelines and transformation scripts, the header information is parsed and translated into metadata that reflects the current schema of the S3 files. This metadata creates a stage table that aligns closely with the data's current structure, facilitating accurate and organized data storage ready for further processing.

Schema drift is indicated by discrepancies between the current data schema and the expected schema through a series of automated checks and transformations. The system compares metadata using saved schemas from previous loads and the newly extracted schema from the current file. If differences are detected, the pipeline flags the schema drift and can optionally halt the process to alert system administrators or data engineers for manual intervention. This approach ensures that schema changes do not go unnoticed and that data integrity is maintained. This is critical for analytical accuracy and operational efficiency in data-driven businesses.

Check out part one on Schema Drift to Excel Files here

If you'd like to try out the Matillion Data Productivity Cloud for yourself, follow the link below to sign up for a free trial. 

Angie Hastings
Angie Hastings

Senior Sales Engineer