Adding Cursor Functionality to Python Scripts

Migrating to Matillion Data Productivity Cloud

Adding Cursor Functionality to Python Scripts

When migrating from Matillion ETL (METL) to Matillion Data Productivity Cloud (DPC), one key difference you may encounter is how Python scripts interact with your data warehouse. In METL, Python Script components using the Jython interpreter could easily execute SQL queries against the environment warehouse using the context.cursor() function. However, DPC does not support Jython and doesn't provide this cursor functionality out of the box.

In this blog post, we'll show you how to maintain compatibility with your existing Python scripts by implementing cursor functionality in DPC, making your migration process smoother and more efficient.

Understanding the Challenge

While best practice would be to refactor your Python scripts to use components like Query Result to Grid, Query Result to Scalar, or SQL Script, we understand that migration timelines often require quicker solutions. To enable faster migrations to DPC, we've developed code snippets that replicate the context.cursor() function from METL.

Important Notes Before You Begin

  • This method will only work with a Hybrid SaaS agent, as the Python Script component is not available on Full SaaS.
  • The implementation uses your agent's cloud credentials to access either AWS Secrets Manager or Azure Key Vault.
  • We recommend creating the required variables as project variables for better management.
  • For Azure-based implementations, you'll need to add specific Python packages to the agent via the Extension Library Location parameter in the deployment template.

Implementation Options

We've prepared cursor implementations for various data platforms. Select the one that matches your environment:

Databricks on AWS (using a personal access token)

Required Project Variables:

  • databricks_server_hostname: The Server Hostname value for your cluster or SQL warehouse
  • databricks_http_path: The HTTP Path value for your cluster or SQL warehouse
  • databricks_pat_secret_name: The name of the secret in AWS Secrets Manager containing the personal access token
  • databricks_pat_secret_key: The key within the secret which has the personal access token as its value

Implementation Code:

#python-code#
################### cursor setup ###################
import boto3
from databricks import sql as dbks_sql
import json
def dpc_cursor():
    client = boto3.client('secretsmanager')
    secret_value_response = client.get_secret_value(SecretId=databricks_pat_secret_name)
    secret_dictionary = json.loads(secret_value_response['SecretString'])
    access_token = secret_dictionary[databricks_pat_secret_key]
    connection = dbks_sql.connect(
        server_hostname=databricks_server_hostname,
        http_path=databricks_http_path,
        access_token=access_token,
    )
    return connection.cursor()
context.cursor = dpc_cursor
################### end of cursor setup ###################

Databricks on Azure (using a personal access token)

Required Custom Python Packages:

  • azure_identity
  • azure-keyvault-secrets

Required Project Variables:

  • databricks_server_hostname: The Server Hostname value for your cluster or SQL warehouse
  • databricks_http_path: The HTTP Path value for your cluster or SQL warehouse
  • azure_key_vault_url: The URL of an Azure Key Vault containing a secret for the personal access token
  • databricks_pat_secret_name: The name of the secret in the Azure Key Vault containing the personal access token

Implementation Code:

#python-code#
################### cursor setup ###################
from azure.identity import ManagedIdentityCredential
from azure.keyvault.secrets import SecretClient
from databricks import sql as dbks_sql
def dpc_cursor():
    client_id = os.environ["AZURE_CLIENT_ID"]
    credential = ManagedIdentityCredential(client_id=client_id)
    key_vault_client = SecretClient(
        vault_url=azure_key_vault_url,
        credential=credential
    )
    key_vault_secret = key_vault_client.get_secret(databricks_pat_secret_name)
    access_token = key_vault_secret.value
    connection = dbks_sql.connect(
        server_hostname=databricks_server_hostname,
        http_path=databricks_http_path,
        access_token=access_token,
    )
    return connection.cursor()
context.cursor = dpc_cursor
################### end of cursor setup ###################

Redshift on AWS (using username/password)

Required Project Variables:

  • redshift_endpoint: Your Redshift cluster endpoint
  • redshift_database: Your database name
  • redshift_port: The port number for your Redshift cluster
  • redshift_username: Your Redshift username
  • redshift_password_secret_name: The name of the secret in AWS Secrets Manager containing the password
  • redshift_password_secret_key: The key within the secret which has the password as its value

Implementation Code:

#python-code#
################### cursor setup ###################
import boto3
import json
import redshift_connector
def dpc_cursor():
    client = boto3.client('secretsmanager')
    secret_value_response = client.get_secret_value(SecretId=redshift_password_secret_name)
    secret_dictionary = json.loads(secret_value_response['SecretString'])
    redshift_password = secret_dictionary[redshift_password_secret_key]
    connection = redshift_connector.connect(
        host=redshift_endpoint,
        database=redshift_database,
        port=redshift_port,
        user=redshift_username,
        password=redshift_password
    )
    return connection.cursor()
context.cursor = dpc_cursor
################### end of cursor setup ###################

Snowflake on AWS (using a private key)

Required Project Variables:

  • snowflake_account: Your Snowflake account identifier
  • snowflake_username: Your Snowflake username
  • snowflake_role: The role to use for the connection
  • snowflake_warehouse: The warehouse to use
  • snowflake_database: The database to connect to
  • snowflake_schema: The schema to use
  • snowflake_private_key_secret_name: The name of the secret in AWS Secrets Manager containing the private key

Implementation Code:

#python-code#
################### cursor setup ###################
import boto3
import json
import re
import snowflake.connector
def extract_key_content(key_string):
    pattern = r'-----BEGIN PRIVATE KEY-----\s*(.*?)\s*-----END PRIVATE KEY-----'
    match = re.search(pattern, key_string, re.DOTALL)
    if match:
        return match.group(1).replace('\n', '').replace(' ', '')
    return None
def dpc_cursor():
    client = boto3.client('secretsmanager')
    secret_value_response = client.get_secret_value(SecretId=snowflake_private_key_secret_name)
    snowflake_private_key = extract_key_content(secret_value_response['SecretString'])
    connection = conn = snowflake.connector.connect(
        account=snowflake_account,
        user=snowflake_username,
        private_key=snowflake_private_key,
        role=snowflake_role,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema
    )
    return connection.cursor()
context.cursor = dpc_cursor
################### end of cursor setup ###################

Snowflake on Azure (using a private key)

Required Custom Python Packages:

  • azure_identity
  • azure-keyvault-secrets

Required Project Variables:

  • snowflake_account: Your Snowflake account identifier
  • snowflake_username: Your Snowflake username
  • snowflake_role: The role to use for the connection
  • snowflake_warehouse: The warehouse to use
  • snowflake_database: The database to connect to
  • snowflake_schema: The schema to use
  • azure_key_vault_url: The URL of an Azure Key Vault containing a secret for the private key
  • snowflake_private_key_secret_name: The name of the secret in Azure Key Vault containing the private key

Implementation Code:

#python-code#
################### cursor setup ###################
from azure.identity import ManagedIdentityCredential
from azure.keyvault.secrets import SecretClient
import re
import snowflake.connector
def extract_key_content(key_string):
    pattern = r'-----BEGIN PRIVATE KEY-----\s*(.*?)\s*-----END PRIVATE KEY-----'
    match = re.search(pattern, key_string, re.DOTALL)
    if match:
        return match.group(1).replace('\n', '').replace(' ', '')
    return None
def dpc_cursor():
    client_id = os.environ["AZURE_CLIENT_ID"]
    credential = ManagedIdentityCredential(client_id=client_id)
    key_vault_client = SecretClient(
        vault_url=azure_key_vault_url,
        credential=credential
    )
    key_vault_secret = key_vault_client.get_secret(snowflake_private_key_secret_name)
    snowflake_private_key = extract_key_content(key_vault_secret.value)
    connection = conn = snowflake.connector.connect(
        account=snowflake_account,
        user=snowflake_username,
        private_key=snowflake_private_key,
        role=snowflake_role,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema
    )
    return connection.cursor()
context.cursor = dpc_cursor
################### end of cursor setup ###################

Implementation Process

To implement cursor functionality in your DPC Python scripts:

  1. Identify which database platform and cloud provider you're using from the options above.
  2. Create the required project variables in your DPC project.
  3. If using Azure, make sure to add the necessary Python packages to your agent.
  4. Copy the appropriate code snippet and add it to the beginning of your Python Script component.

Once you've added the cursor setup code, you can continue using context.cursor() in your Python scripts just as you did in METL. The function will now connect to your data warehouse and return a cursor object that you can use to execute SQL queries.

Long-Term Considerations

While this approach provides a quick solution for migration, we recommend gradually refactoring your Python scripts to use the native DPC components (Query Result to Grid, Query Result to Scalar, or SQL Script) for better performance and maintainability. This cursor implementation should be seen as a migration aid rather than a permanent solution.

Conclusion

By implementing cursor functionality in Matillion DPC, you can significantly ease the migration process from Matillion ETL. This approach allows you to maintain compatibility with your existing Python scripts while you transition to DPC's native components at your own pace.

For teams with tight migration timelines or extensive Python Script usage in METL, these cursor implementations provide a practical pathway to DPC adoption without requiring immediate refactoring of all Python scripts.

Need more help with your migration to Matillion Data Productivity Cloud? Contact our support team for personalized assistance.

Chris Upton
Chris Upton

Staff Data Engineer

Experienced data professional, I've been working with databases for over 25 years. I have a deep knowledge of SQL and have worked with a variety of relational and NoSQL databases, both on-prem and in the cloud.

Get started today

Matillion's comprehensive data pipeline platform offers more than point solutions.