Resources > Tech Guides

Realizing Real-Time Data Warehousing on Snowflake with Striim

 

Introduction

Snowflake is one of the most popular data warehouses built for the cloud. Snowflake enables organizations to be data-driven, and make better and quicker business decisions.

Typically, in an organization, the data is generated from diverse data sources and applications deployed in various locations including on-premises data centers, major public clouds, and devices. In order for organizations to be data-driven and responsive, the real-time availability of data on Snowflake for making time-sensitive, operational decisions is of utmost importance.

Striim provides a next-generation solution for data integration that meets these needs by continuously collecting, processing, and moving any enterprise data in real-time. Customers are using Striim for streaming large volumes of data from a variety of sources. This includes OLTP databases with very large schemas containing 100-1000s of tables into Snowflake.

Unlike the traditional batch and schedule-based ETL solutions, the Striim platform continuously integrates data from on-premises and cloud data sources into Snowflake. This capability of Striim enables real-time data warehousing solutions on Snowflake.

The key objective of this technical guide is to describe the solution offered by Striim to support and enable real-time data warehousing solutions on Snowflake.

Striim’s Real-Time Data Integration Solution for Snowflake

Striim for Snowflake enables the rapid development and deployment of a continuous, real-time data integration solution into Snowflake from a variety of data sources that are deployed on-premise or in the cloud. The following diagram illustrates Striim’s high-level solution architecture for building real-time data feeds from a variety of data sources into Snowflake.

Key Features of Striim Platform

  • Rich set of out-of-the-box (OOTB) adapters including Database Readers and Snowflake Writer that can be used for rapid development of continuous data flows into Snowflake
  • Native support for scalability, data reliability (no data loss, no duplicates), security (authorization and encryption), and high performance (in-memory, distributed) processing and delivery
  • OOTB processors, in-memory data windows, built-in caches, and event tables that can be used to apply rich transformations and enrichments to events as they flow into Snowflake
  • Real-time monitoring, alerting and dashboarding capabilities
  • Multiple interfaces including rich, web-based drag-and-drop UI, command line interface, and REST APIs
  • Available as a PaaS offering on major cloud platforms including Amazon, Azure, and Google Cloud, enabling rapid integration with Snowflake. Striim PaaS solutions enable seamless integration between on-premises and in-the-cloud databases and data warehouses.

Striim Solution Architecture for Snowflake

Striim offers a flexible solution architecture that can be configured based on custom use case requirements for achieving real-time, continuous integration from on-premises and cloud-based sources into Snowflake.

Referring to the above diagram, Striim solutions can be configured to continuously collect data flowing from a variety of sources, process the data in flight, and deliver it into Snowflake.

Striim offers a wide variety of readers (including bulk readers, change data capture (CDC) readers, Kafka/JMS readers, etc.) that continuously capture changes from relational databases, KV stores such as MongoDB, messaging systems, logs, network/sensors, IoT protocols (such as MQTT and OPC-UA), and others. Refer to the documentation for details about various OOTB readers supported by Striim.

The captured data can be transformed in-stream, filtered, aggregated, masked, and enriched prior to delivery into corresponding tables on Snowflake. Striim optimizes the delivery into Snowflake by batching continuous data streams using client-side buffering. Striim provides multiple options for staging the delivery into Snowflake including Amazon S3, Azure Blob, or using Snowflake-native staging.

Refer to Appendix A for a brief description of Striim concepts which will help in understanding some of the Striim terminology used in the following sections.

Striim Solution Development and Deployment Options

Striim provides a rich web-based UI for developing, testing, and deploying integration solutions. Striim offers flexible deployment options when integrating between on-premises and the cloud. Depending upon the use-case requirements – which includes resource availability, integration topology, security restrictions, etc. – the appropriate option for deploying the solution can be chosen. The options for deployment include:

  • Develop and deploy solutions entirely on the cloud
  • Develop and deploy entirely on-premises
  • Develop on the cloud and deploy portions of the solution in a hybrid on-premises / on-cloud topology

The following diagram illustrates the approach for developing the application on the cloud, and then deploying portions of the application on-premises and on the cloud. This deployment approach, which is also referred to as agent-based approach, is suitable for integrating the on-premises data sources with the cloud when the on-premises data sources have no direct network access to the cloud.

This diagram also highlights the various security features supported by the platform, including source and target authentication, and data encryption support when replicating from on-premises into the cloud.

In the following sections, we illustrate various use cases and the steps required to address the use case. We illustrate a solution using Oracle DB as the data source deployed on-premises, along with Snowflake deployed on Azure.

Use case #1 – Initial Load and Synchronization

Objective: Keep Snowflake on Azure in-sync with on-premises Oracle DB

Integration Requirements:

  • Bulk load Customer table from Oracle into the corresponding table on Snowflake.
  • Continuously stream changes from Oracle into Snowflake – replicate Inserts, Updates and Deletes as is on the target.

Data Integration Solution for Bulk Loading

The following diagram illustrates the application for bulk loading from Oracle into Snowflake. The TQL associated with this application is presented in Appendix B.

The application uses the OOTB Database reader which is configured to bulk read from Oracle database tables, and the Snowflake writer which is configured to write to the corresponding tables on Snowflake. The adapter documentation provides detailed information about the configuration parameters for each of the adapters.

Key Striim configuration parameters for Snowflake writer

The following table highlights some of the key configuration parameters for the Snowflake writer when configuring for bulk loading.

 

Parameter Name Description Value
External Stage Type Snowflake writer provides the option for using the Snowflake external stage or Azure blob for intermediate storage prior to loading into Snowflake tables. Choose Local (Snowflake Staging) or AzureBlob.
Append Only Determines if the records should be appended or merged when applying it to the target table. Set to Append (non-default value)
Upload Policy Determines when to upload the data from the staging. Two variables are provided – EventCount and Interval. These values set for these parameters applies individually to each of the tables the Snowflake writer is configured to process. Set appropriate values for event count and interval. Set higher values for EventCount for larger tables compared to smaller tables. Set a reasonable value for interval to ensure writer flushes events for tables that are less than EventCount.

 

Performance tuning tip

  • When bulk loading a large number of tables, it is recommended to design multiple applications such that each application processes a group of tables that share similar characteristics (e.g. size of the table – large tables, medium, and small table), and configuring the Snowflake writer in each application with appropriate values for Upload Policy as discussed above.
  • Turning on the Append Only parameter will provide better performance compared to using Merge (default option where Append Only is off). However, if the bulk load application fails before completing, then before restarting the application, truncate the records in the appropriate Snowflake tables.

Data Integration Solution for Continuous Replication

The following diagram illustrates the application for continuously replicating data from Oracle into Snowflake. The TQL associated with this application is presented in Appendix C.

The application uses Striim’s OOTB Oracle DB CDC reader, which is configured to continuously read the changes from Oracle database tables, along with Striim’s Snowflake writer, which is configured to write to the corresponding tables on Snowflake.

Prior to starting the application, it is necessary to ensure the change data capture (CDC) mechanism is enabled on the source database. It is also necessary to check that the change record captured for update operations has the complete image of the record. The following points highlight the steps required for enabling the CDC on Oracle:

  • Enable archivelog on the Oracle database, if not already enabled
  • Enable full supplemental logging on the Oracle database, if not already enabled

Refer to Appendix D for instructions on enabling archivelog and full supplemental logging.

Key Striim Configuration Parameters for Snowflake Writer

Parameter Name Value
External Stage Type Choose Local or AzureBlob
Append Only Set to Merge (default value)
Upload Policy See the note below

 

Note regarding setting Upload Policy

Use the following guidelines when configuring the Upload Policy parameter. These guidelines are particularly useful when replicating a large number of tables, each having different transactional volumes.

  • It is highly inefficient to frequently upload small batches of records compared to uploading large batches at a fixed interval.
  • Design multiple application such that each application processes a group of tables that share similar transactional characteristics (e.g. tables with consistently large number of transactions, or tables with small number of transactions). Set the Upload Policy in Striim’s Snowflake writer for each application that would ensure an optimal upload batch size. Use the interval parameter and tune the interval value to achieve desired performance.

For more information, refer to Appendix F, which illustrates the source database operations on a sample table on Oracle, and the result on the corresponding table on Snowflake for this scenario.

Refer to Appendix F for an example of the Striim Monitor UI output on Snowflake listing various details about replicating data into Snowflake.

Use Case #2 – Audit Operations

Objective: Audit (capture) all DML operations occurring on Oracle DB (on-premises) in Snowflake

Data Integration Requirements:

  • Bulk load Customer table from Oracle into the corresponding table on Snowflake.
  • Continuously stream changes from Oracle into Snowflake – replicate Inserts, Updates and Deletes as is on the target.

The solution approach for bulk loading in this use case is the same as in use case #1.

The solution for continuously replicating changes so that Inserts, Updates and Deletes are all inserted on the target will require the following configuration setting in the Snowflake writer.

Append Only This option determines if the records should be appended or merged when applying it to the target table. Set to Append.

For more detail, refer to Appendix F, which illustrates the source database operations on a sample table on Oracle, and the result on the corresponding table on Snowflake for this scenario.

Use Case #3 – Audit Operations with Details

Objective: Audit (capture) all DML operations occurring on Oracle DB (on-premises) in Snowflake with details about the operations.

Data Integration Requirements:

  • Bulk load Customer table from Oracle into the corresponding table on Snowflake.
  • Continuously replicate changes from Oracle into Snowflake – replicate Inserts, Updates and Deletes as INSERTS on the target AND map the operation type (I, U, D) into a column op_type on the target.

The solution approach for bulk loading in this use case is the same as in use case #1, with the following changes to the Snowflake writer:

Include ColumnMap(OP_TYPE=’INSERT’) in the source to target table mapping property.

Example:

QATEST.CUSTOMER, DEMO_DB.PUBLIC.CUST ColumnMap(OP_TYPE= INSERT)

However, we modify the solution for continuously replicating changes so that Inserts, Updates, and Deletes are all inserted on the target, and the operation type metadata is mapped to the OP_TYPE column in the table. The TQL associated with this application is presented in Appendix E.

The solution (see the illustration below) involves using Striim’s OOTB Database Event Transformer “To Staging” which captures the operation type in the event user data, and then maps to the OP_TYPE column in the Snowflake writer.

CQ Generated by the DB Event “to_stage” transformer:

ChangeOperationToInsert(PutUserData(x, 'OrigOperationName', META(x, 'OperationName'))) FROM customer_str x;

Edit the Table Mapping in the Snowflake writer as follows:

QATEST.CUSTOMER,DEMO_DB.PUBLIC.CUST ColumnMap(OP_TYPE=@USERDATA(OrigOperationName) )

Refer to Appendix F which illustrates the source database operations on a sample table on Oracle, and the result on the corresponding table on Snowflake for this scenario.

Conclusion

In this technical guide, we introduced the Striim real-time data integration solution for Snowflake, helping to enable real-time data warehousing applications. We touched upon Striim’s solution architecture, illustrated sample use cases, and briefly highlighted the capabilities of Striim product features.

We invite you to test drive Striim’s solution for Snowflake, either by downloading the platform from www.striiim.com, or provisioning the Striim PaaS solution in your preferred cloud marketplace on AWS, Azure, or the Google Cloud.

 

Appendix A: Striim Key Concepts

Refer to Striim documentation for further details.

Concept Brief Description
Application Applications are custom solutions. E.g. Oracle to Snowflake is considered an application.

Applications may be created graphically using the web client or coded using the Tungsten Query
Language (TQL), a SQL-like language that can be extended with Java

Flow Flows define what data an application receives, how it processes the data, and what it does with
the results.Flows are made up of several kinds of components:

  • Sources to receive real-time event data from adapters (e.g. Oracle)
  • Streams to define the flow of data among the other components
  • Windows to bound the event data by time or count
  • Continuous Queries (CQ) to filter, aggregate, join, enrich, and transform the data
  • Processors – Pre-built processors that generate CQ for processing and transforming events
    including DB events.
  • Caches – Caches of historical, or reference data to enrich the event data
  • WActionStores to populate the built-in reports and visualizations and persist the processed data
  • Target to pass data to external applications (e.g. Snowflake)

 

Appendix B: TQL for Bulk Load Application

CREATE APPLICATION OracleToSnowflakeBL;

CREATE OR REPLACE SOURCE Oracle_DB_Reader USING DatabaseReader (
  DatabaseProviderType: ‘Oracle’,
  FetchSize: 10,
  Username: ‘test ‘,
  Password_encrypted: true,
  ConnectionURL: ‘jdbc:oracle:thin:@10.0.0.161:1521:orcl’,
  Tables: ‘QATEST.CUSTOMER’,
  adapterName: ‘DatabaseReader’,
  Password: ‘abcd’,
  Password_encrypted: ‘true’
)
OUTPUT TO ora_reader_str ;

CREATE OR REPLACE TARGET SF_writer USING SnowflakeWriter (
  s3Region: 'us-west-1',
  azureContainerName: 'striim-snowflake-container',
  uploadPolicy: 'eventcount:10,interval:5m',
  password: 'abcdefg',
  password_encrypted: 'true',
  tables: 'QATEST.CUSTOMER,DEMO_DB.PUBLIC.CUST',
  azureAccountAccessKey_encrypted: 'true',
  s3SecretAccessKey_encrypted: 'true',
  s3BucketName: 'striim-snowflake-bucket',
  connectionUrl: 'jdbc:snowflake://abc.snowflakecomputing.com/?db=DEMO_DB&schema=public&CLIENT_SESSION_KEEP_ALIVE=true',
  adapterName: 'SnowflakeWriter',
  externalStageType: 'Local',
  appendOnly: 'false',
  columnDelimiter: '|',
  username: 'test'
)
INPUT FROM ora_reader_str;

END APPLICATION OracleToSnowflakeBL;

 

Appendix C: TQL for Continuous Replication

CREATE APPLICATION OracleToSnowflakeCDC;

CREATE OR REPLACE SOURCE Oracle_CDC_Reader USING OracleReader (
  Compression: false,
  StartTimestamp: 'null',
  SupportPDB: false,
  TransactionBufferDiskLocation: '.striim/LargeBuffer',
  FetchSize: 2,
  DDLCaptureMode: 'All',
  QuiesceMarkerTable: 'QUIESCEMARKER',
  CommittedTransactions: true,
  QueueSize: 2048,
  FilterTransactionBoundaries: true,
  Password_encrypted: true,
  SendBeforeImage: true,
  XstreamTimeOut: 600,
  ConnectionURL: '10.0.0.161:1521:orcl',
  Tables: 'QATEST.CUSTOMER',
  adapterName: 'OracleReader',
  Password: 'abcd',
  Password_encrypted: 'true',
  TransactionBufferType: 'Memory',
  DictionaryMode: 'OnlineCatalog',
  connectionRetryPolicy: 'timeOut=30, retryInterval=30, maxRetries=3',
  StartSCN: 'null',
  ReaderType: 'LogMiner',
  Username: test,
  TransactionBufferSpilloverSize: '1MB',
  OutboundServerProcessName: 'WebActionXStream'
)
OUTPUT TO customer_str ;

CREATE OR REPLACE TARGET SF_writer USING SnowflakeWriter (
  s3Region: 'us-west-1',
  azureContainerName: 'striim-snowflake-container',
  uploadPolicy: 'eventcount:2,interval:1m',
  password: 'abcd',
  password_encrypted: 'true',
  tables: 'QATEST.CUSTOMER,DEMO_DB.PUBLIC.CUST',
  azureAccountAccessKey_encrypted: 'true',
  s3SecretAccessKey_encrypted: 'true',
  s3BucketName: abc-snowflake-bucket',
  connectionUrl: 'jdbc:snowflake://abc.snowflakecomputing.com/?db=DEMO_DB&schema=public&CLIENT_SESSION_KEEP_ALIVE=true',
  adapterName: 'SnowflakeWriter',
  externalStageType: 'Local',
  appendOnly: 'true',
  columnDelimiter: '|',
  username: 'test'
)
INPUT FROM customer_str;
END APPLICATION OracleToSnowflakeCDC;

 

Appendix D: Enabling archive log and supplemental log on Oracle

  • Enabling archivelog
    The following steps check whether archivelog is enabled and, if not, enable it.

    • Log in to SQL*Plus as the sys user and do the following
      select log_mode from v$database;
      If the command returns ARCHIVELOG, it is enabled. Skip ahead to Enabling supplemental log data.If the command returns NOARCHIVELOG enter:
      shutdown immediateWait for the message ORACLE instance shut down, then enter:
      startup mount

      Wait for the message Database mounted, then enter:
      alter database archivelog;
      alter database open;

    • To verify that ARCHIVELOG has been enabled, enter:
      select log_mode from v$database; again. This time it should return ARCHIVELOG.
  • Enabling full supplemental logging
    alter database add supplemental log data (all) columns;

    Optionally, instead of enabling supplemental logging for the database, supplemental logging can be
    enabled for specific set of tables that are being replicated.
  • Activate changes
    alter system switch logfile;

 

Appendix E: TQL for Continuous Replication with Processing

CREATE APPLICATION OracleToSnowflakeCDC;
CREATE STREAM tostage_str OF Global.WAEvent;
CREATE OR REPLACE TARGET SF_writer USING SnowflakeWriter  (
  s3Region: 'us-west-1',
  azureContainerName: 'striim-snowflake-container',
  uploadPolicy: 'eventcount:4,interval:1m',
  password: 'abcde',
  password_encrypted: 'true',
  tables: 'QATEST.CUSTOMER,DEMO_DB.PUBLIC.CUST ColumnMap(OP_TYPE=@USERDATA(OrigOperationName) )',
  azureAccountAccessKey_encrypted: 'true',
  s3SecretAccessKey_encrypted: 'true',
  s3BucketName: 'abc-snowflake-bucket',
  connectionUrl: 'jdbc:snowflake://abc.snowflakecomputing.com/?db=DEMO_DB&schema=public&CLIENT_SESSION_KEEP_ALIVE=true',
  adapterName: 'SnowflakeWriter',
  externalStageType: 'Local',
  appendOnly: 'true',
  columnDelimiter: '|',
  username: 'test'
 )
INPUT FROM tostage_str;
CREATE OR REPLACE SOURCE Oracle_CDC_Reader USING OracleReader  (
  Compression: false,
  StartTimestamp: 'null',
  SupportPDB: false,
  TransactionBufferDiskLocation: '.striim/LargeBuffer',
  FetchSize: 4,
  DDLCaptureMode: 'All',
  QuiesceMarkerTable: 'QUIESCEMARKER',
  CommittedTransactions: true,
  QueueSize: 2048,
  FilterTransactionBoundaries: true,
  SendBeforeImage: true,
  XstreamTimeOut: 600,
  ConnectionURL: '10.0.0.161:1521:orcl',
  Tables: 'QATEST.CUSTOMER',
  adapterName: 'OracleReader',
  Password: 'test',
  TransactionBufferType: 'Memory',
  DictionaryMode: 'OnlineCatalog',
  connectionRetryPolicy: 'timeOut=30, retryInterval=30, maxRetries=3',
  StartSCN: 'null',
  ReaderType: 'LogMiner',
  Username: 'test',
  TransactionBufferSpilloverSize: '1MB',
  OutboundServerProcessName: 'WebActionXStream',
  Password_encrypted: true
 )
OUTPUT TO customer_str ;
WITH UICONFIG = "{\"subType\":\"To Staging\",\"config\":{\"stream\":\"admin.STREAM.customer_str\",\"columns\":[{\"column\":\"admin.STREAM.customer_str\"}]}}"
 CREATE  CQ to_stage
INSERT INTO tostage_str
SELECT ChangeOperationToInsert(PutUserData(x, 'OrigOperationName', META(x, 'OperationName'))) FROM customer_str x;
;
END APPLICATION OracleToSnowflakeCDC;

 

Appendix F: Use case scenarios sample outputs

This section illustrates the source database operations on a sample table on Oracle and the result on the corresponding table on Snowflake
Schema of the sample table on oracle.
SQL> desc customer;

Name Null ? Type
CUSTID NOT NULL NUMBER
NAME VARCHAR2 (255)
DOB DATE
ORDERID NUMBER

Source Database operations on customer table:

SQL>

insert into customer values (1, 'cust1', to_date('2012/01/22', 'yyyy/mm/dd'), 100);
insert into customer values (2, 'cust2', to_date('2012/02/22', 'yyyy/mm/dd'), 101);
insert into customer values (3, 'cust3', to_date('2012/03/22', 'yyyy/mm/dd'), 102);
insert into customer values (4, 'cust4', to_date('2012/04/22', 'yyyy/mm/dd'), 103);
insert into customer values (5, 'cust5', to_date('2012/05/22', 'yyyy/mm/dd'), 104);
commit;
update customer set dob=to_date('2013/01/22', 'yyyy/mm/dd') where custid =1;
update customer set dob=to_date('2013/02/22', 'yyyy/mm/dd') where custid =2;
commit;
delete from customer where custid=5;
commit;

Use case #1: Continuous Replication Result

CUSTID NAME DOB ORDERID
1 cust1 2013-01-22 100
2 cust1 2013-02-22 101
3 cust1 2013-03-22 102
4 cust1 2012-04-22 104

Use case #2: Continuous Replication Result

CUSTID NAME DOB ORDERID
1 cust1 2013-01-22 100
2 cust1 2013-02-22 101
3 cust1 2013-03-22 102
4 cust1 2012-04-22 104
5 cust5 2012-05-22 105
1 cust1 2013-01-22 100
2 cust1 2013-02-22 101
5 cust5 2012-05-22 105

Use case#3: Continuous Replication Result with in-line processing

CUSTID NAME DOB ORDERID OP_TYPE
1 cust1 2013-01-22 100 INSERT
2 cust1 2013-02-22 101 INSERT
3 cust1 2013-03-22 102 INSERT
4 cust1 2012-04-22 104 INSERT
5 cust5 2012-05-22 105 INSERT
1 cust1 2013-01-22 100 UPDATE
2 cust1 2013-02-22 101 UPDATE
5 cust5 2012-05-22 105 DELETE

 

Appendix G: Striim Monitor Output