Skip to main content

Snowflake Writer

Writes to one or more existing tables in Snowflake. Events are staged to local storage, AWS S3, or Azure Storage, then written to Snowflake as per the Upload Policy setting. Striim connects to Snowflake over JDBC with SSL enabled. Files are uploaded to Snowflake's staging area using Snowflake's PUT command and are encrypted using 128-bit keys.

If this reader will be deployed to a Forwarding Agent, install the driver as described in Install the Snowflake JDBC driver.

To evaluate Striim with Snowflake, see Getting your free trial of Striim for Snowflake.

Snowflake Writer properties

property

type

default value

notes

Append Only

Boolean

False

With the default value of False, updates and deletes in the source are handled as updates and deletes in the target.

Set to True to handle updates and deletes as inserts in the target. With this setting:

  • Updates and deletes from DatabaseReader, IncrementalBatchReader, and SQL CDC sources are handled as inserts in the target.

  • Primary key updates result in two records in the target, one with the previous value and one with the new value. If the Tables setting has a ColumnMap that includes @METADATA(OperationName), the operation name for the first event will be DELETE and for the second INSERT.

Authentication Type

enum

Password

Selects the type of user authentication.

Select Password to use username/password pairs for authentication.

Select OAuth to use OAuth for authentication.

Select KeyPair to use private/public key pairs for authentication. Authenticating with Key-pair removes the requirement to pass the Private key and User role properties separately when using streaming uploads. Key-pair authentication supports but does not require encrypted private keys.

CDDL Action

String

Process

See Handling schema evolution.

Client Configuration

String

If using a proxy, specify ProxyHost=<host name or IP address>,ProxyPort=<port number>.

Client ID

String

This property is required when OAuth authentication is selected.

Client Secret

encrypted password

This property is required when OAuth authentication is selected.

Column Delimiter

String

| (UTF-8 007C)

The character(s) used to delimit fields in the delimited text files in which the adapter accumulates batched data. If the data will contain the | character, change the default value to a sequence of characters that will not appear in the data.

Connection URL

String

The JDBC driver connection string for your Snowflake account. The syntax is jdbc:snowflake://<account identifier>.snowflakecomputing.com?db=<database>. The account identifier is part of the URL you use to log in to Snowflake: for example, if your login URL were https://ef12345.west-us-2.azure.snowflakecomputing.com/console/login, the account identifier would be ef12345.west-us-2.azure. (For more information, see Docs > Managing Your Snowflake Account > Account Identifiers.) The JDBC connection uses SSL.

External Stage Type

String

local

With the default value of local, stages to a Snowflake internal named stage.

To stage to Azure Storage, set to AzureBlob and set the Azure Storage properties as described below.

To stage to S3, set to S3 and set the S3 properties as described below.

File Format Options

null_if = ""

Do not change unless instructed to by Striim support.

Ignorable Exception Code

String

Set to TABLE_NOT_FOUND to prevent the application from terminating when Striim tries to write to a table that does not exist in the target. See Handling "table not found" errors for more information.

Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE).

Null Marker

String

Optionally, specify a string inserted into fields in the stage files to indicate that a field has a null value. These are converted back to nulls in the target tables. If any field might contain the string NULL, change this to a sequence of characters that will not appear in the data.

When you set a value for Null Marker, set the same value for File Format Options. For example, if Null Marker is xnullx, File Format Options must be null_if="xnullx".

Optimized Merge

Boolean

false

Set to True only when Append Only is True and the target's input stream is the output of an HP NonStop reader, MySQL Reader, or Oracle Reader source and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false.

Parallel Threads

Integer

See Creating multiple writer instances.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Private Key

password

When Streaming Upload is True, specify the private key with which to authenticate the user. The key may be stored in a vault. This property is required when using public/private key pair authentication. This property supports, but does not require, encrypted private keys.

Private Key Passphrase

encrypted password

This property is required when using key pair authentication with an encrypted private key.

Refresh Token

password

This property is required when OAuth authentication is selected.

Streaming Configuration

String

MaxParallelRequests=5, MaxRequestSizeInMB=5, MaxRecordsPerRequest=10000

When Streaming Upload is False, this setting is ignored.

  • MaxParallelRequests:

    • Specifies the number of streaming requests (threads) that will be executed in parallel.

    • When Append Only is True and you are not doing an initial load, set to 1 to ensure records are not written out of order.

  • MaxRecordsPerRequest: the maximum number of records per streaming request

  • MaxRequestSizeInMB: the maximum size of each streaming request in MB

Streaming Upload

Boolean

False

With the default value of False, Snowflake Writer will use the Snowflake JDBC driver.

Set to True to use the Snowpipe Streaming API.

When set to True, the adapter uses public/private key authentication, ignoring other settings that affect authentication type. When set to True,specify the Private Key and User Role properties, and, optionally, adjust the Streaming Configuration property as appropriate.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in the DBMS and the user specified in Username must have insert permission.

Specify Snowflake target table names in uppercase as <SCHEMA>.<TABLE>. The database is specified in the connection URL.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use the % wildcard only for tables, not for schemas or databases. If the reader uses three-part names, you must use them here as well. Note that Oracle CDB/PDB source table names must be specified in two parts when the source is Database Reader or Incremental Batch reader (schema.%,schema.%) but in three parts when the source is Oracle Reader or OJet ((database.schema.%,schema.%). Note that SQL Server source table names must be specified in three parts when the source is Database Reader or Incremental Batch Reader (database.schema.%,schema.%) but in two parts when the source is MS SQL Reader or MS Jet (schema.%,schema.%). Examples:

source.emp,MYSCHEMA.EMP
source.%,MYSCHEMA.%

See Mapping columns for additional options.

Upload Policy

String

eventcount:10000, interval:5m

The upload policy may include eventcount and/or interval (see Setting output names and rollover / upload policies for syntax). Cached data is written to the storage account every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to the storage account.

Username

String

Specify the username you use to log in to Snowflake. Alternatively, specify the name of a Snowflake user with SELECT, INSERT, UPDATE, and DELETE privileges on the tables to be written to and the CREATE TABLE privileges on the database specified in the connection URL.

User Role

String

When Streaming Upload is True, specify the role to use for the session (see Docs » Managing Security in Snowflake » Administration & Authorization » Access Control in Snowflake » Overview of Access Control).

Azure Storage properties for Snowflake Writer

property

type

default value

notes

Azure Account Access Key

encrypted password

the account access key from Storage accounts > <account name> > Access keys

Azure Account Name

String

the name of the Azure storage account for the blob container

Azure Container Name

String

the blob container name from Storage accounts > <account name> > Containers

If it does not exist, it will be created.

S3 properties for Snowflake Writer

Specify either the access key and secret access key or an IAM role.

property

type

default value

notes

S3 Access Key

String

an AWS access key ID (created on the AWS Security Credentials page) for a user with read and write permissions on the bucket (leave blank if using an IAM role)

S3 Bucket Name

String

Specify the S3 bucket to be used for staging. If it does not exist, it will be created.

S3 IAM Role

String

an AWS IAM role with read and write permissions on the bucket (leave blank if using an access key)

S3 Region

String

the AWS region of the bucket

S3 Secret Access Key

encrypted password

the secret access key for the access key

Authentication mechanisms

This adapter supports the following authentication mechanisms:

  • Username and password

  • Public/private key pair

  • OAuth

Username and password authentication

Set the values of the username and password properties as normal.

Public/private key pair authentication

This procedure generates the public/private key pair. For details, see the Snowflake documentation on configuring key pair authentication.

  1. From a terminal, execute the following command.

    openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out keyname.p8 -nocrypt

    This command generates an unencrypted private key. Remove the -nocrypt option to generate an encrypted private key. Replace keyname with a file name for the private key.

  2. From a terminal, execute the following command.

     openssl rsa -in keyname.p8 -pubout -out pubkeyname.pub

    Replace keyname with the name chosen in the previous step. Replace pubkeyname with a file name for the public key.

  3. In the console, execute the following command to assign the public key to the user account.

    ALTER USER any_snowflake_user SET RSA_PUBLIC_KEY='code string';

    Replace code string with the public key, not including the start and end key delimiters.

Example 5. Example: Snowflake Writer with an encrypted private key
CREATE TARGET SfTgt USING SnowflakeWriter
(
  ConnectionURL:'jdbc:snowflake://striim.snowflakecomputing.com/?db=DEMO_DB',
  username:'infra_1780_oauth_bearer_encrypted',
  appendOnly:'true',
  IgnorableExceptionCode:'TABLE_NOT_FOUND',
  Tables:'QATEST.oracleRawSRC,QATEST1679489873.SNOWFLAKERAWTGT',
  uploadpolicy:'eventcount:1,interval:10s',
  privateKey:'keydata',
  streamingUpload:'TRUE',
  userRole:'SYSADMIN',
  authenticationType:'KeyPair',
  privateKeyPassphrase:'striim'
)
 INPUT FROM OracleInitStream;


Example 6. Example: Snowflake Writer with an unencrypted private key
CREATE OR REPLACE TARGET sf USING Global.SnowflakeWriter (
   userRole: 'sysadmin',
   connectionUrl: 'jdbc:snowflake://striim.snowflakecomputing.com/?db=SAMPLEDB&schema=SANJAYPRATAP',
   streamingUpload: 'true',
   tables: 'public.sample_pk,SAMPLEDB.SANJAYPRATAP.SAMPLE_TB',
   CDDLAction: 'Process',
   optimizedMerge: 'false',
   columnDelimiter: '|',
   privateKey_encrypted: 'true',
   appendOnly: 'false',
   authenticationType: 'KeyPair',
   username: 'rahul_mishra',
   uploadPolicy: 'eventcount:10000,interval:5m',
   privateKey: 'keydata',
   externalStageType: 'Local',
   adapterName: 'SnowflakeWriter',
   fileFormatOptions: 'null_if = \"\"' 
)
 INPUT FROM sysout;


OAuth authentication

Snowflake enables OAuth integration with Striim through the Security Integration Snowflake object.

The following procedure creates the Security Integration object.

  1. Log in to the Snowflake Web Interface with a user account with the privilege to create the Security Integration.

  2. Create the Security Integration object.

    create or replace security integration DEMO_OAUTH
        		type=oauth
        		enabled=true
        		oauth_client=CUSTOM
        		oauth_client_type='CONFIDENTIAL'
        		oauth_redirect_uri='https://localhost.com:7734/striim-callback'
        		oauth_issue_refresh_tokens = true
                    oauth_refresh_token_validity = 7776000
                    oauth_allow_non_tls_redirect_uri = true;
    

    Note

    When you are using Custom Role, add the following line to grant the Usage on Integration role:

    grant usage on integration DEMO_OAUTH to role api_admin;
  3. In the Snowflake console, issue the following command:

    desc integration DEMO_OAUTH;

    Note the following values for later use.

    Postman name

    Value name

    Client ID

    OAUTH_CLIENT_ID

    Authorization URL

    OAUTH_AUTHORIZATION_ENDPOINT

    Token URL

    OAUTH_TOKEN_ENDPOINT

    Refresh Token Expires In

    OAUTH_REFRESH_TOKEN_VALIDITY

  4. In the Snowflake console, issue the following command:

    select system$show_oauth_client_secrets('DEMO_OAUTH');

    The first value is the Client Secret. Note the Client Secret for future use.

  5. In Postman, create a new token using the noted values.

  6. Sign in to Snowflake.

    After authenticating, Snowflake sends an authentiction access and refresh token.

  7. Copy the refresh token.

The following procedure uses curl and the Web browser to fetch the refresh token.

  1. In the Snowflake Console, issue the following command:

    desc integration DEMO_OAUTH;

    Note the Client ID.

  2. URL encode the Client ID.

  3. Set the encoded Client ID as the value of the ENCODED_OAUTH_CLIENT_ID property.

  4. Construct the endpoint call.

    https://<snowflake_account_url>/oauth/authorize?
       response_type=code&
       client_id=<ENCODED_OAUTH_CLIENT_ID>&
       redirect_uri=https%3A%2F%2Flocalhost%3A7734%2Fstriim-callback

    Replace <snowflake_account_url> with your account URL in the format myorg-account_xyz.snowflakecomputing.com. Replace <ENCODED_OAUTH_CLIENT_ID> with the URL encoded Client ID. Optionally, add a scope to the URL with &scope=<session:role:R1>. URL encode the scope definition. The call uses the default scope for the user when no scope is provided.

  5. Open the endpoint URL in a browser.

    The Snowflake login screen appears.

  6. Authenticate to Snowflake. If a consent dialog box displays, click Allow.

    The web browser redirects to the specified redirect URI. The authorization code is the part of the URI after the code= string.

  7. Note the authorization code for future use.

  8. From a terminal shell, execute the curl command with the token endpoint call.

    curl -X POST 'https://<snowflake_account_url>/oauth/token-request' \
     -H 'Authorization: Basic <Base64 Encoded value of ClientID:ClientSecret>'\
     -H 'Content-Type: application/x-www-form-urlencoded' \
     -d 'grant_type=authorization_code&code=<authorization_code>&redirect_uri=https%3A%2F%2Flocalhost%3A7734%2Fstriim-callback'

    Replace <snowflake_account_url> with your account URL. Replace <Base64 Encoded value of ClientID:ClientSecret> with a Base64 encoding of the Client ID and Client Secret separated by a colon (:) character. Replace <authorization_code> with the previously noted authorization code.

    Snowflake sends the refresh token as a response to the command.

  9. Note the refresh token for future use.

Snowflake Writer sample application

The following sample application will write data from PosDataPreview.csv to Snowflake. The target table must exist.

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET SnowflakeDemo USING SnowflakeWriter (
  ConnectionURL: '<JDBC connection string',
  username: 'striim',
  password: '********',
  Tables: 'MYDB.MYSCHEMA.POSDATA',
  appendOnly: true
)
INPUT FROM PosSource_TransformedStream;

The above example shows how to use SnowflakeWriter with an input of a user-defined type. For examples of applications where the input is the output of a CDC reader, DatabaseReader, or IncrementalBatchReader, see Replicating Oracle data to Snowflake.

The following is an example of the properties required to test the Snowpipe Streaming API preview:

CREATE TARGET SnowflakeDemo USING SnowflakeWriter (
  ConnectionURL: '<JDBC connection string',
  username: 'striim',
  password: '********',
  Tables: 'MYDB.MYSCHEMA.POSDATA',
  appendOnly: true,
  streamingUpload:'true',
  userRole: 'ACCOUNTADMIN',
  privateKey: '*****'
)
INPUT FROM PosSource_TransformedStream;

Snowflake Writer data type support and correspondence

See also Data type support & mapping for schema conversion & evolution.

TQL type

Snowflake type

Boolean

BOOLEAN

Byte, Integer, Long, Short

BIGINT, DOUBLE, FLOAT, INT, NUMBER, SMALLINT 

DateTime

DATE, DATETIME, TIMESTAMP_L, TIMESTAMP_NTZ, TIMESTAMP_TZ

Double, Float

DOUBLE, FLOAT

String

CHAR, VARCHAR

When the input of a Snowflake Writer target is the output of an Oracle source (DatabaseReader, Incremental Batch Reader, or Oracle Reader):

Oracle type

Snowflake type

BINARY

BINARY

BINARY_DOUBLE

FLOAT

BINARY_FLOAT

FLOAT

BLOB

BINARY

CHAR

CHAR

CLOB

VARCHAR

DATE

DATE

FLOAT

FLOAT

INTEGER

NUMBER

INTERVAL DAY TO SECOND

VARCHAR

INTERVAL YEAR TO MONTH

VARCHAR

LONG

not supported

LONG RAW

not supported

NCHAR

CHAR

NCLOB

VARCHAR

NUMBER

NUMBER

NUMBER(<precision>,<scale>)

NUMBER((<precision>,<scale>)

NVARCHAR2

VARCHAR

RAW

BINARY

TIMESTAMP

TIMESTAMP_NTZ

TIMESTAMP WITH LOCAL TIMEZONE

TIMESTAMP_LTZ

TIMESTAMP WITH TIMEZONE

TIMESTAMP_TZ

VARCHAR2

VARCHAR

XMLTYPE

VARCHARD

See Oracle Reader and OJet WAEvent fields for additional information including limitations for some types.