Snowflake Writer
Writes to one or more existing tables in Snowflake. Events are staged to local storage, AWS S3, or Azure Storage, then written to Snowflake as per the Upload Policy setting. Striim connects to Snowflake over JDBC with SSL enabled. Files are uploaded to Snowflake's staging area using Snowflake's PUT command and are encrypted using 128-bit keys.
If this reader will be deployed to a Forwarding Agent, install the driver as described in Install the Snowflake JDBC driver.
To evaluate Striim with Snowflake, see Getting your free trial of Striim for Snowflake.
Snowflake Writer properties
property | type | default value | notes |
---|---|---|---|
Append Only | Boolean | False | With the default value of False, updates and deletes in the source are handled as updates and deletes in the target. Set to True to handle updates and deletes as inserts in the target. With this setting:
|
Authentication Type | enum |
| Selects the type of user authentication. Select Select Select |
CDDL Action | String | Process | |
Client Configuration | String | If using a proxy, specify | |
Client ID | String | This property is required when OAuth authentication is selected. | |
Client Secret | encrypted password | This property is required when OAuth authentication is selected. | |
Column Delimiter | String | | (UTF-8 007C) | The character(s) used to delimit fields in the delimited text files in which the adapter accumulates batched data. If the data will contain the |
Connection URL | String | The JDBC driver connection string for your Snowflake account. The syntax is | |
External Stage Type | String | local | With the default value of To stage to Azure Storage, set to To stage to S3, set to |
File Format Options | null_if = "" | Do not change unless instructed to by Striim support. | |
Ignorable Exception Code | String | Set to TABLE_NOT_FOUND to prevent the application from terminating when Striim tries to write to a table that does not exist in the target. See Handling "table not found" errors for more information. Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE). | |
Null Marker | String | Optionally, specify a string inserted into fields in the stage files to indicate that a field has a null value. These are converted back to nulls in the target tables. If any field might contain the string NULL, change this to a sequence of characters that will not appear in the data. When you set a value for Null Marker, set the same value for File Format Options. For example, if Null Marker is | |
Optimized Merge | Boolean | false | Set to True only when Append Only is True and the target's input stream is the output of an HP NonStop reader, MySQL Reader, or Oracle Reader source and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false. |
Parallel Threads | Integer | ||
Password | encrypted password | The password for the specified user. See Encrypted passwords. | |
Private Key | password | When Streaming Upload is True, specify the private key with which to authenticate the user. The key may be stored in a vault. This property is required when using public/private key pair authentication. This property supports, but does not require, encrypted private keys. | |
Private Key Passphrase | encrypted password | This property is required when using key pair authentication with an encrypted private key. | |
Refresh Token | password | This property is required when OAuth authentication is selected. | |
Streaming Configuration | String | MaxParallelRequests=5, MaxRequestSizeInMB=5, MaxRecordsPerRequest=10000 | When Streaming Upload is False, this setting is ignored.
|
Streaming Upload | Boolean | False | With the default value of False, Snowflake Writer will use the Snowflake JDBC driver. Set to True to use the Snowpipe Streaming API. When set to True, the adapter uses public/private key authentication, ignoring other settings that affect authentication type. When set to True,specify the Private Key and User Role properties, and, optionally, adjust the Streaming Configuration property as appropriate. |
Tables | String | The name(s) of the table(s) to write to. The table(s) must exist in the DBMS and the user specified in Username must have insert permission. Specify Snowflake target table names in uppercase as When the target's input stream is a user-defined event, specify a single table. When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use the source.emp,MYSCHEMA.EMP source.%,MYSCHEMA.% See Mapping columns for additional options. | |
Upload Policy | String | eventcount:10000, interval:5m | The upload policy may include eventcount and/or interval (see Setting output names and rollover / upload policies for syntax). Cached data is written to the storage account every time any of the specified values is exceeded. With the default value, data will be written every five minutes or sooner if the cache contains 10,000 events. When the app is undeployed, all remaining data is written to the storage account. |
Username | String | Specify the username you use to log in to Snowflake. Alternatively, specify the name of a Snowflake user with SELECT, INSERT, UPDATE, and DELETE privileges on the tables to be written to and the CREATE TABLE privileges on the database specified in the connection URL. | |
User Role | String | When Streaming Upload is True, specify the role to use for the session (see Docs » Managing Security in Snowflake » Administration & Authorization » Access Control in Snowflake » Overview of Access Control). |
Azure Storage properties for Snowflake Writer
property | type | default value | notes |
---|---|---|---|
Azure Account Access Key | encrypted password | the account access key from Storage accounts > <account name> > Access keys | |
Azure Account Name | String | the name of the Azure storage account for the blob container | |
Azure Container Name | String | the blob container name from Storage accounts > <account name> > Containers If it does not exist, it will be created. |
S3 properties for Snowflake Writer
Specify either the access key and secret access key or an IAM role.
property | type | default value | notes |
---|---|---|---|
S3 Access Key | String | an AWS access key ID (created on the AWS Security Credentials page) for a user with read and write permissions on the bucket (leave blank if using an IAM role) | |
S3 Bucket Name | String | Specify the S3 bucket to be used for staging. If it does not exist, it will be created. | |
S3 IAM Role | String | an AWS IAM role with read and write permissions on the bucket (leave blank if using an access key) | |
S3 Region | String | the AWS region of the bucket | |
S3 Secret Access Key | encrypted password | the secret access key for the access key |
Authentication mechanisms
This adapter supports the following authentication mechanisms:
Username and password
Public/private key pair
OAuth
Username and password authentication
Set the values of the username and password properties as normal.
Public/private key pair authentication
This procedure generates the public/private key pair. For details, see the Snowflake documentation on configuring key pair authentication.
From a terminal, execute the following command.
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out keyname.p8 -nocrypt
This command generates an unencrypted private key. Remove the
-nocrypt
option to generate an encrypted private key. Replace keyname with a file name for the private key.From a terminal, execute the following command.
openssl rsa -in keyname.p8 -pubout -out pubkeyname.pub
Replace keyname with the name chosen in the previous step. Replace pubkeyname with a file name for the public key.
In the console, execute the following command to assign the public key to the user account.
ALTER USER any_snowflake_user SET RSA_PUBLIC_KEY='code string';
Replace code string with the public key, not including the start and end key delimiters.
CREATE TARGET SfTgt USING SnowflakeWriter
(
ConnectionURL:'jdbc:snowflake://striim.snowflakecomputing.com/?db=DEMO_DB',
username:'infra_1780_oauth_bearer_encrypted',
appendOnly:'true',
IgnorableExceptionCode:'TABLE_NOT_FOUND',
Tables:'QATEST.oracleRawSRC,QATEST1679489873.SNOWFLAKERAWTGT',
uploadpolicy:'eventcount:1,interval:10s',
privateKey:'keydata',
streamingUpload:'TRUE',
userRole:'SYSADMIN',
authenticationType:'KeyPair',
privateKeyPassphrase:'striim'
)
INPUT FROM OracleInitStream;
CREATE OR REPLACE TARGET sf USING Global.SnowflakeWriter (
userRole: 'sysadmin',
connectionUrl: 'jdbc:snowflake://striim.snowflakecomputing.com/?db=SAMPLEDB&schema=SANJAYPRATAP',
streamingUpload: 'true',
tables: 'public.sample_pk,SAMPLEDB.SANJAYPRATAP.SAMPLE_TB',
CDDLAction: 'Process',
optimizedMerge: 'false',
columnDelimiter: '|',
privateKey_encrypted: 'true',
appendOnly: 'false',
authenticationType: 'KeyPair',
username: 'rahul_mishra',
uploadPolicy: 'eventcount:10000,interval:5m',
privateKey: 'keydata',
externalStageType: 'Local',
adapterName: 'SnowflakeWriter',
fileFormatOptions: 'null_if = \"\"'
)
INPUT FROM sysout;
OAuth authentication
Snowflake enables OAuth integration with Striim through the Security Integration Snowflake object.
The following procedure creates the Security Integration object.
Log in to the Snowflake Web Interface with a user account with the privilege to create the Security Integration.
Create the Security Integration object.
create or replace security integration DEMO_OAUTH type=oauth enabled=true oauth_client=CUSTOM oauth_client_type='CONFIDENTIAL' oauth_redirect_uri='https://localhost.com:7734/striim-callback' oauth_issue_refresh_tokens = true oauth_refresh_token_validity = 7776000 oauth_allow_non_tls_redirect_uri = true;
Note
When you are using Custom Role, add the following line to grant the Usage on Integration role:
grant usage on integration DEMO_OAUTH to role api_admin;
In the Snowflake console, issue the following command:
desc integration DEMO_OAUTH;
Note the following values for later use.
Postman name
Value name
Client ID
OAUTH_CLIENT_ID
Authorization URL
OAUTH_AUTHORIZATION_ENDPOINT
Token URL
OAUTH_TOKEN_ENDPOINT
Refresh Token Expires In
OAUTH_REFRESH_TOKEN_VALIDITY
In the Snowflake console, issue the following command:
select system$show_oauth_client_secrets('DEMO_OAUTH');
The first value is the Client Secret. Note the Client Secret for future use.
In Postman, create a new token using the noted values.
Sign in to Snowflake.
After authenticating, Snowflake sends an authentiction access and refresh token.
Copy the refresh token.
The following procedure uses curl
and the Web browser to fetch the refresh token.
In the Snowflake Console, issue the following command:
desc integration DEMO_OAUTH;
Note the Client ID.
URL encode the Client ID.
Set the encoded Client ID as the value of the ENCODED_OAUTH_CLIENT_ID property.
Construct the endpoint call.
https://<snowflake_account_url>/oauth/authorize? response_type=code& client_id=<ENCODED_OAUTH_CLIENT_ID>& redirect_uri=https%3A%2F%2Flocalhost%3A7734%2Fstriim-callback
Replace <snowflake_account_url> with your account URL in the format myorg-account_xyz.snowflakecomputing.com. Replace <ENCODED_OAUTH_CLIENT_ID> with the URL encoded Client ID. Optionally, add a scope to the URL with
&scope=<session:role:R1>
. URL encode the scope definition. The call uses the default scope for the user when no scope is provided.Open the endpoint URL in a browser.
The Snowflake login screen appears.
Authenticate to Snowflake. If a consent dialog box displays, click Allow.
The web browser redirects to the specified redirect URI. The authorization code is the part of the URI after the
code=
string.Note the authorization code for future use.
From a terminal shell, execute the
curl
command with the token endpoint call.curl -X POST 'https://<snowflake_account_url>/oauth/token-request' \ -H 'Authorization: Basic <Base64 Encoded value of ClientID:ClientSecret>'\ -H 'Content-Type: application/x-www-form-urlencoded' \ -d 'grant_type=authorization_code&code=<authorization_code>&redirect_uri=https%3A%2F%2Flocalhost%3A7734%2Fstriim-callback'
Replace <snowflake_account_url> with your account URL. Replace <Base64 Encoded value of ClientID:ClientSecret> with a Base64 encoding of the Client ID and Client Secret separated by a colon (
:
) character. Replace <authorization_code> with the previously noted authorization code.Snowflake sends the refresh token as a response to the command.
Note the refresh token for future use.
Snowflake Writer sample application
The following sample application will write data from PosDataPreview.csv to Snowflake. The target table must exist.
CREATE SOURCE PosSource USING FileReader ( wildcard: 'PosDataPreview.csv', directory: 'Samples/PosApp/appData', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO PosSource_Stream; CREATE CQ PosSource_Stream_CQ INSERT INTO PosSource_TransformedStream SELECT TO_STRING(data[1]) AS MerchantId, TO_DATE(data[4]) AS DateTime, TO_DOUBLE(data[7]) AS AuthAmount, TO_STRING(data[9]) AS Zip FROM PosSource_Stream; CREATE TARGET SnowflakeDemo USING SnowflakeWriter ( ConnectionURL: '<JDBC connection string', username: 'striim', password: '********', Tables: 'MYDB.MYSCHEMA.POSDATA', appendOnly: true ) INPUT FROM PosSource_TransformedStream;
The above example shows how to use SnowflakeWriter with an input of a user-defined type. For examples of applications where the input is the output of a CDC reader, DatabaseReader, or IncrementalBatchReader, see Replicating Oracle data to Snowflake.
The following is an example of the properties required to test the Snowpipe Streaming API preview:
CREATE TARGET SnowflakeDemo USING SnowflakeWriter ( ConnectionURL: '<JDBC connection string', username: 'striim', password: '********', Tables: 'MYDB.MYSCHEMA.POSDATA', appendOnly: true, streamingUpload:'true', userRole: 'ACCOUNTADMIN', privateKey: '*****' ) INPUT FROM PosSource_TransformedStream;
Snowflake Writer data type support and correspondence
See also Data type support & mapping for schema conversion & evolution.
TQL type | Snowflake type |
---|---|
Boolean | BOOLEAN |
Byte, Integer, Long, Short | BIGINT, DOUBLE, FLOAT, INT, NUMBER, SMALLINT |
DateTime | DATE, DATETIME, TIMESTAMP_L, TIMESTAMP_NTZ, TIMESTAMP_TZ |
Double, Float | DOUBLE, FLOAT |
String | CHAR, VARCHAR |
When the input of a Snowflake Writer target is the output of an Oracle source (DatabaseReader, Incremental Batch Reader, or Oracle Reader):
Oracle type | Snowflake type |
---|---|
BINARY | BINARY |
BINARY_DOUBLE | FLOAT |
BINARY_FLOAT | FLOAT |
BLOB | BINARY |
CHAR | CHAR |
CLOB | VARCHAR |
DATE | DATE |
FLOAT | FLOAT |
INTEGER | NUMBER |
INTERVAL DAY TO SECOND | VARCHAR |
INTERVAL YEAR TO MONTH | VARCHAR |
LONG | not supported |
LONG RAW | not supported |
NCHAR | CHAR |
NCLOB | VARCHAR |
NUMBER | NUMBER |
NUMBER(<precision>,<scale>) | NUMBER((<precision>,<scale>) |
NVARCHAR2 | VARCHAR |
RAW | BINARY |
TIMESTAMP | TIMESTAMP_NTZ |
TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP_LTZ |
TIMESTAMP WITH TIMEZONE | TIMESTAMP_TZ |
VARCHAR2 | VARCHAR |
XMLTYPE | VARCHARD |
See Oracle Reader and OJet WAEvent fields for additional information including limitations for some types.