Solve data ingestion challenges: Incremental data loading with medallion architecture on Databricks

High watermark loading: The problem

As data volumes continue to grow exponentially, the viability of full-loads from sources into a target data platform like Databricks is becoming increasingly less viable. 

Full loads are expensive in compute, storage, and time. Therefore, incremental load strategies are becoming the norm to grab the latest set of records that we haven’t already loaded.

This is done in many ways, one of which is called “high watermarking.” High watermarking is a technique used in incremental data loading where the last loaded timestamp or identifier (the "high watermark") is tracked and used as a threshold to identify and load only new or updated records with timestamps or identifiers greater than the previous high watermark. 

This approach ensures efficient incremental loading by excluding already loaded data, minimizing duplication, and reducing load times. Some vendors offer incremental loading (sometimes labeling it as ‘change data capture’) and manage all of this for you. 

However, this hands-off approach tends to be very expensive in cost, limited in visibility and customization for your organization’s needs, and often requires a trade-off of data sovereignty.

Fortunately, Matillion offers a hybrid deployment to ensure data stays within your organization, a code-optional approach to give you the ease of use of data loading and flexibility of custom coding frameworks.

So, how do incremental load strategies work in Matillion? It’s pretty straightforward, and this blog and supporting video will cover it in detail! If you prefer watching instead of reading, a demo is also available at the end of this blog!

Note: This strategy can work across various sources. Salesforce was chosen as an example, but you can apply this approach to databases like Oracle/Teradata/MS SQL or SaaS applications like ServiceNow, Marketo, and more.

Configuring our pipeline

For this demonstration, we are looking to incrementally load our data from our Salesforce instance into a delta table in our bronze schema. 

Then, we want the pipeline to immediately transform that new bronze data into our ODS table in our silver schema and build a virtual fact and dimension in our gold schema in Databricks.

Creating our pipeline variables

For this job to work, we will need to create two pipeline variables that will store newly populated values at runtime. Here is a breakdown of the two variables that we will create:

1.HighWaterMark: This variable will hold the value of the high watermark column from our ODS table.

2. MaxSurrogateKey: This variable will take the max ID from our ODS table to ensure our IDENTITY column has no duplicate values (this can occur due to the distributed MPP nature of Databricks). This approach may result in some skipped values in the sequence, but we are okay with that.

Creating our ODS table in our silver schema

Matillion supports DDL executions in low code and high code fashion, using the create table or SQL script components. For simplicity, we used the SQL script component to generate our table using the script below:

CREATE TABLE IF NOT EXISTS `se-unity-catalog`.`silver`.mtln_qs_sf_ods_account (
  surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,
  id STRING,
  account_name STRING,
  annual_revenue DECIMAL(20, 0),
  last_modified_timestamp TIMESTAMP
)
USING DELTA

This script will run every time the job runs but will only create the table once due to the CREATE TABLE IF NOT EXISTS syntax used in the script.

Querying our ODS table for the latest high watermark

Using our Query Result to Scalar component, we can easily execute a query against our silver ODS table in Databricks to capture the maximum high watermark value. The query that we will run is: 

SELECT 
COALESCE(DATE_FORMAT(MAX(last_modified_timestamp), 'yyyy-MM-dd HH:mm:ss'), '1990-01-01 00:00:00') AS HWM
, nvl(max(surrogate_key),1) as MSK
FROM `se-unity-catalog`.silver.mtln_qs_sf_ods_account

Our mapping from this query to our variables looks like this:

In our first run, this query will not return anything since the ODS table is empty, so our variables' default values will be used for the initial bulk load. Any subsequent runs will have runtime values passed through to the respective variables.

Querying data from our source (Salesforce)

Matillion offers 100+ out-of-the-box connectors that load directly into delta tables in Databricks, one of which is Salesforce. For this example, we configured this component to use standard user/password authentication, but other authentications, like OAUTH, are supported. 

In the Salesforce component properties, under the Configure tab, we need to specify a filter for which we will pass in the HighWaterMark variable value at runtime. For this example, we are incrementally loading data from the Account table in SFDC, and the high watermark column that we will filter against is the LastModifiedDate found in the Account table.

To set the filter, we enter the following into the Data Source Filter field in the component: 

Notice how we can reference the value of a variable by using the ${variable_name} syntax in the Value field.

For the Destination tab, we will want to point this component to our bronze schema to a table called stg_sfdc_incrload_account. Matillion will create this table if it doesn’t exist, so there’s no need to create it in advance!

Processing newly loaded data from bronze to gold

As records trickle into our bronze stg_sfdc_incrload_account table, we must process them through our transformation process to merge them into our silver schema. One option for doing this is via a Databricks native transformation leveraging things like workflows + notebooks and/or delta live tables.

The other option is using native tooling from Matillion in conjunction with our built-in orchestration and scheduling features. Transformation jobs in Matillion allow users to create sparkSQL-based pipelines in a low-code fashion. 

Rather than writing this complex SQL statement to merge into our silver mtln_qs_sf_ods_account table, we can use the graphical interface to build the SQL and execute it against our Databricks SQL warehouse or all-purpose compute cluster.

Bronze → silver

Matillion’s orchestration layer allows users to dynamically pass variables to sub-jobs at run time. In our example, we pass down the MaxSurrogateKey value from the top-level orchestration job to our transformation job by setting the Scalar Variable mapping.

Once this value is passed in, we can execute our transformation job and reference the runtime value when the job is running. This allows for dynamic workflows and modular frameworks to be easily built in Matillion while still leveraging the power and scale of Databricks.

This job runs directly after we ingest Salesforce data and populate new records from our bronze to silver tables.

Here’s the SQL generated by the transformation job:

MERGE INTO `se-unity-catalog`.`silver`.`mtln_qs_sf_ods_account` AS `target` USING (SELECT 
  * 
FROM (SELECT 
  `Id` AS `id`, 
  `Name` AS `account_name`, 
  `AnnualRevenue` AS `annual_revenue`, 
  `LastModifiedDate` AS `last_modified_timestamp`, 
  `surrogate_key` AS `surrogate_key` 
FROM (SELECT 
  `Id`, 
  `Name`, 
  `AnnualRevenue`, 
  `LastModifiedDate`, 
  `rn`, 
  monotonically_increasing_id()+1 AS `surrogate_key` 
FROM (SELECT 
  * 
FROM (SELECT 
  *, 
  ROW_NUMBER() OVER (PARTITION BY `Id` 
                     ORDER BY `LastModifiedDate` DESC) AS `rn` 
FROM (SELECT 
  `Id`, 
  `Name`, 
  `AnnualRevenue`, 
  `LastModifiedDate` 
FROM `se-unity-catalog`.`bronze`.`stg_sfdc_incrload_account`)) 
WHERE (`rn` = 1))))) AS `input` ON `input`.`id`=`target`.`id` 
WHEN MATCHED
AND true THEN
UPDATE
SET `account_name` = `input`.`account_name`,
  `annual_revenue` = `input`.`annual_revenue`,
  `last_modified_timestamp` = `input`.`last_modified_timestamp`
  WHEN NOT MATCHED THEN
INSERT (
    `id`,
    `account_name`,
    `annual_revenue`,
    `last_modified_timestamp`
  )
VALUES (
    `input`.`id`,
    `input`.`account_name`,
    `input`.`annual_revenue`,
    `input`.`last_modified_timestamp`);
Silver → gold

Like in our previous sample transformation, we will create virtualized dimensions and fact tables through transformations and views. You can build whatever transformation logic you need, such as SCD2 or materialized views, but to keep things brief, we'll demonstrate the following silver-to-gold transformation.

Dim logic:

SELECT 
  `surrogate_key` AS `dim_account_sk`, 
  `id` AS `natural_key`, 
  `account_name` AS `account_name`, 
  `annual_revenue` AS `annual_revenue`, 
  `last_modified_timestamp` AS `last_modified_timestamp` 
FROM (SELECT 
  `surrogate_key`, 
  `id`, 
  `account_name`, 
  `annual_revenue`, 
  `last_modified_timestamp` 
FROM `se-unity-catalog`.`silver`.`mtln_qs_sf_ods_account`)

Fact logic:

SELECT 
  `surrogate_key`, 
  `id`, 
  `account_name`, 
  `last_modified_timestamp`, 
  `surrogate_key` AS `dim_account_sk`, 
  `annual_revenue` AS `annual_revenue`, 
  CAST(DATE_FORMAT(`last_modified_timestamp`, "yyyyMMdd") AS INT) AS `dim_date_sk`, 
  CAST(DATE_FORMAT(`last_modified_timestamp`, "HHmmss") AS INT) AS `dim_time_sk`, 
  `id` AS `natural_key` 
FROM (SELECT 
  `surrogate_key`, 
  `id`, 
  `account_name`, 
  `annual_revenue`, 
  `last_modified_timestamp` 
FROM `se-unity-catalog`.`silver`.`mtln_qs_sf_ods_account`)

Summary

Matillion offers a code-optional visual interface that allows you to build ingestion and transformation pipelines. Our platform gives you the flexibility found in high-code frameworks coupled with the ease of us with no-code ingestion platforms. 

Additionally, we provide a visual transformation UI that generates sparkSQL that executes locally on your Databricks compute, whether that is serverless SQL warehouses or all-purpose compute clusters.

If you’re a visual learner or prefer to watch a video walkthrough of this blog, please check out the link below!

 
Konrad Bafia
Konrad Bafia

Manager, Sales Engineering

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.