Handling Schema Drift in Variant Data: A Step-by-Step Guide

Adapting to changes in upstream data schemas is more important than ever. Whether you’re dealing with new fields in a REST API or variant data types in Kafka streams, schema drift can pose significant challenges. To address this, I will talk through a schema drift pipeline that dynamically adjusts to upstream changes, ensuring a seamless ingestion process into downstream data layers.

Here’s a comprehensive walkthrough of the pipeline and its key components.

What Is Schema Drift?

Schema drift occurs when the structure of incoming data changes—new fields are added, removed, or updated—without prior notice. This can disrupt data processing pipelines, especially when dealing with loosely defined schemas like JSON or other semi-structured formats. This pipeline is designed to handle these changes automatically, focusing on variant data types, particularly in Kafka streams.

Pipeline Overview

Source Data: Kafka Streams

In this demo, I consume data from a Kafka source, where I’m not the producer. This means upstream changes, like the addition of new fields, occur without my direct control. My goal is to:

  1. Identify new fields in the Kafka source.
  2. Dynamically map and add them to the downstream silver layer in my data pipeline.

For this example, I’m using Confluent Kafka with JSON as the data format. The pipeline simulates multiple Kafka consumers to efficiently handle large volumes of data.

How It Works

1. Dynamic Detection of New Fields

  • Incoming Kafka packets are sampled to detect new fields added by upstream producers.
  • For instance, in one sample, a new field IP address appeared, which was not part of the existing schema in my silver layer.

2. Schema Mapping and Transformation

  • The pipeline compares the current schema in the silver layer with the detected fields in the raw layer.
  • If discrepancies (new fields) are found, the pipeline constructs SQL queries to dynamically update the schema.

3. Leveraging SQL and Variables

  • SQL statements for ALTER TABLE and INSERT INTO are dynamically generated using Cortex AI and Snowflake.
  • These statements update the silver layer schema, ensuring new fields like IP address are seamlessly incorporated.

4. Orchestration and Execution

  • The pipeline executes the SQL commands, updates the living SQL file (used for subsequent runs), and ingests the new data into the silver layer.
  • Any temporary tables or artifacts created during the process are automatically cleaned up for a streamlined workflow.

Key Features

Multi-Consumer Kafka Simulation

To handle high data volumes, the pipeline simulates multiple Kafka consumers. This approach ensures load balancing and faster data ingestion.

Cortex AI Integration

Cortex plays a pivotal role in:

  • Identifying new fields.
  • Constructing the appropriate SQL queries.
  • Updating the schema dynamically.
Shared Pipeline for Reusability

The pipeline is designed as a shared component, allowing users to configure raw and target data layers dynamically. Once set up, it can be reused across different projects and data sources.

Challenges and Limitations

While the pipeline is effective, there are some caveats:

  • Single-Field Updates: Currently, the pipeline handles one new field at a time. Multiple new fields may require additional testing.
  • Nested JSON Structures: Deeply nested JSON structures pose challenges for SQL generation. The pipeline is best suited for simpler semi-structured data.
  • Performance Variability: Execution times can vary depending on the complexity of the schema updates.

Setting It Up

Initial Configuration
  1. Define the Desired Silver Layer Schema: Use an initial SQL script to establish the baseline schema.
  2. Set Variables: Configure variables for raw and target databases, schemas, and tables.
Pipeline Execution
  • Execute the pipeline to detect schema changes, update the silver layer, and ingest new data.
  • Use monitoring tools like Slack notifications to alert teams when new fields are ingested.
Reusability

With shared job components, the pipeline can be easily adapted to different data sources, making it a versatile tool for handling schema drift.

Results

After running the pipeline:

  1. The new field IP address was dynamically added to the silver layer schema.
  2. Historical data without the new field remained unaffected, while new rows with the IP address field were correctly ingested.
  3. The process was completed in minutes, demonstrating its efficiency and practicality.

 

Next Steps

  • Enhance the pipeline to handle multiple new fields and complex nested JSON structures.
  • Integrate Slack notifications to inform teams about schema changes in real time.
  • Optimize performance for larger data streams and schemas.
Joe Herbet
Joe Herbet

Enterprise Sales Engineer

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.