Tutorial

Capture Schema Evolution from Postgres CDC source and Stream changes to Snowflake with Striim

Use Striim to handle schema changes on source database in real-time

Benefits

Capture DDL Changes while replicating data from source to target

Split the static table writers into another Striim app without affecting the other writers

Automatically keep schemas and models in sync with your operational database.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

PostgreSQL

PostgreSQL is an open-source relational database management system.

Snowflake

Snowflake is a cloud-native relational data warehouse that offers flexible and scalable architecture for storage, compute and cloud services.

On this page

Overview

Striim is a next generation unified data streaming product that offers change data capture (CDC) from popular databases such as Oracle, SQLServer, PostgreSQL and many others. To maximize uptime, Striim can capture common DDL statements in the source table and replicate those changes to the target tables, or take other actions such as quiescing or halting the application. To know more about the supported CDC sources and adapters, please follow this link.

In one of our previous recipes, we have shown how to create a replica slot and cdc user to stream CDC changes from the postgres source table in real-time. In this tutorial we have configured a Striim app that captures schema evolution like CREATE TABLE, ALTER TABLE (eg. add column) and DROP TABLE and delegate the changes to the target through striim. In case of a new column, the target table updates with the new column. For CREATE and DROP TABLE, Striim’s message logs notify the new DDL change for any further action by the user. Please follow the steps below to configure your source database and Striim app for capturing schema evolution. Please refer to our github repository for all codes, datasets and tql file of this app.

Step 1: Create a Replication Slot and Replication User

For CDC application on a postgres database, make sure the following flags are enabled for the postgres instance:

Create a user with replication attribute by running the following command on google cloud console:

CREATE USER replication_user WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD ‘yourpassword’;

Follow the steps below to set up your replication slot for change data capture:

Create a logical slot with wal2json plugin.

Step 2: Configure PostgreSQL CDDL Capture Procedure and CDDL Tracking Table

CDDL stands for “Common Data Definition Language”. Striim supports Create Table, Alter Table (Add, Modify, Drop, Add primary key and adding unique constraints) and Drop Table for handling schema evolution or CDDL.The source adapter must be able to capture the CDDLs executed on the interested tables and schema and the target adapter must be able to process DDL WAEvents sent by the source adapter.

For PostgreSQL CDDL Capture procedure and Tracking, a set of SQL scripts has to be executed on the source database by the customer with the superuser role. Firstly, the schema is created if it does not already exist and an empty table with DDL Capture fields is created that will record the DDL changes. 

CREATE SCHEMA IF NOT EXISTS striim;
CREATE TABLE IF NOT EXISTS striim.ddlcapturetable
  (
    event           TEXT,
    tag             TEXT,
    classid         OID,
    objid           OID,
    objsubid        INT,
    object_type     TEXT,
    schema_name     TEXT,
    object_identity TEXT,
    is_extension    BOOL,
    query           TEXT,
    username        TEXT DEFAULT CURRENT_USER,
    db_name TEXT DEFAULT Current_database(),
    client_addr     INET DEFAULT Inet_client_addr(),
    creation_time   TIMESTAMP DEFAULT now()
  ); 

The next step is to write a PostgreSQL function that collects DDL change logs from pg_stat_activity and inserts into ddlcapturetable. The function shown below is called ‘ddl_capture_command()’ which is executed inside two event triggers in the next section.

GRANT USAGE ON SCHEMA striim TO PUBLIC;
GRANT SELECT, INSERT ON TABLE striim.ddlcapturetable TO PUBLIC;

create or replace function striim.ddl_capture_command() returns event_trigger as $$
declare v1 text;
r record;
begin

    select query into v1 from pg_stat_activity where pid=pg_backend_pid();
    if TG_EVENT='ddl_command_end' then
        SELECT * into r FROM pg_event_trigger_ddl_commands();
        if r.classid > 0 then
            insert into striim.ddlcapturetable(event, tag, classid, objid, objsubid, object_type, schema_name, object_identity, is_extension, query)
            values(TG_EVENT, TG_TAG, r.classid, r.objid, r.objsubid, r.object_type, r.schema_name, r.object_identity, r.in_extension, v1);
        end if;
    end if;
    if TG_EVENT='sql_drop' then
            SELECT * into r FROM pg_event_trigger_dropped_objects();
            insert into striim.ddlcapturetable(event, tag, classid, objid, objsubid, object_type, schema_name, object_identity, is_extension, query)
            values(TG_EVENT, TG_TAG, r.classid, r.objid, r.objsubid, r.object_type, r.schema_name, r.object_identity, 'f', v1);
    end if;
end;
$$ language plpgsql strict;
 

Once the ddlcapture table is created and ddl_capture_command() function is defined, two event triggers are executed as follows.

CREATE EVENT TRIGGER pg_get_ddl_command on ddl_command_end                    EXECUTE PROCEDURE striim.ddl_capture_command();
CREATE EVENT TRIGGER pg_get_ddl_drop on sql_drop                              EXECUTE PROCEDURE striim.ddl_capture_command();

Step 3: Create the CDC app that handles Schema Evolution on Striim SaaS

There is an additional CDDL configuration in source and target which was not required in traditional CDC DML streaming app. For the CDDL configuration in the source database, click on ‘Show Advanced Setting’ as shown below.

Enable ‘SCHEMA EVOLUTION-CDDL CAPTURE’ and enter the name of the CDDL TRACKING TABLE that we created in Step 2. The CDDL ACTION property is ‘Process’.

For the Snowflake target under ‘Show Advanced Setting’, ‘Process’ is selected under CDDL ACTION. This ensures the ALTER TABLE changes in the source table are replicated into the target table in Snowflake.

Step 4: Run the App and check the message logs and target table for any DDL changes.

For this tutorial, I have used a sample table ‘data1’ containing two columns ‘Name’ and ‘Salary’. When a new column ‘Sex’ is added, it is streamed and the target table in snowflake is updated.

Enable ‘SCHEMA EVOLUTION-CDDL CAPTURE’ and enter the name of the CDDL TRACKING TABLE that we created in Step 2. The CDDL ACTION property is ‘Process’.

 

For the Snowflake target under ‘Show Advanced Setting’, ‘Process’ is selected under CDDL ACTION. This ensures the ALTER TABLE changes in the source table are replicated into the target table in Snowflake.

When we add a new table ‘data3’  into the schema, DDL Operation is ignored but the message log notifies the user about the new change. The metadata from the message log can be used to set alert for different types of DDL operations

Static Tables

There are four types of action labels for CDDL supported source and target adapters. Striim can also handle data replication sources that contain tables with static schema. The four actions that can be executed on capturing CDDLs are:

Process:  This is the default action behavior that parses the DDL query and streams into the target table

Ignore: The DDL events will be captured and stored into internal metadata repository but will not be sent to the downstream consumers

Quiesce: When a DDL event is captured on the interested tables, source adapters will issue the Quiesce command and DDL operation will not be sent to downstream consumers. This action label is specific to Source Adapters only.

Halt: On receiving DDL action from upstream, the adapters will halt the app. This action is important when we want to halt the application when DDL is executed on static tables.

Here is an example showing a striim app with one static table. Since there are multiple tables in the source, we specify the action label ‘HALT’ on the target adapter.

You can also create a separate app for static tables that reads from the same source stream. This would not halt the data streams for other tables while a DDL is executed on static tables. Configure a separate app with the same source stream as follows. The source app is still running while the app containing the target app with a static table is halted.

Wrapping Up: Start your Free Trial Today

Our tutorial showed you how to handle schema evolution in PostgreSQL database and stream the CDC to Snowflake target, a leading cloud data warehouse through Striim SaaS. By constantly moving your data into Snowflake, you could track the schema changes as well as build analytics or machine learning models, all with minimal impact to your current systems. You could also start ingesting and normalizing more datasets with Striim to fully take advantage of your data.

As always, feel free to reach out to our integration experts to schedule a demo, or try Striim for free here.