Skip to main content

Snowflake Reader programmer's reference

Snowflake Reader properties

property

type

default value

notes

Connection Profile Name

Enum

String

Snowflake Reader requires a connection profile. See Introducing connection profiles.

Connection Retry Policy

String

initialRetryDelay=10, retryDelayMultiplier=1.0, maxRetryDelay=30, maxAttempts=3, totalTimeout=600

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 10 seconds (InitialRetryDelay=10). If that attempt is unsuccessful, every 10 seconds it will try again (InitialRetryDelay=10s multiplied by retryDelayMultiplier=1.0)until a total of three connection attempts have been made (maxAttempts=3), after which the adapter will halt and log an exception.

The adapter will halt when either maxAttempts or totalTimeout is reached.

InitialRetryDelay, maxRetryDelay, and totalTimeout may be specified in milliseconds (ms), seconds (s, the default), or minutes (m).

If retryDelayMultiplier is set to 1, connection will be attempted on the fixed interval set by InitialRetryDelay.

To disable connection retry, set maxAttempts=0.

Negative values are not supported.

Excluded Tables

String

Data for any tables specified here will not be returned. For example, if Tables uses a wildcard, data from any tables specified here will be omitted. Multiple table names (separated by semicolons) and wildcards may be used exactly as for Tables.

Polling Interval

String

60s

With the default value of 60s, Striim will check the Snowflake source for new data every 60 seconds. This may be specified as seconds or minutes, for example, 5m for five minutes.

Schema Evolution - CDDL Action

Enum

Process

See Handling schema evolution.Handling schema evolution

Schema Evolution - CDDL Capture

Boolean

False

See Handling schema evolution. Supports only ALTER TABLE ... ADD COLUMN and ALTER TABLE ... DROP COLUMN.Handling schema evolution

Snowflake Configuration

String

ThreadPoolSize=1, EnableQuickCatchUp=true, YieldAfter=0 (disabled by default)

You can configure the following properties for Snowflake Configuration:

  • ThreadPoolSize: controls the number of threads for concurrently capturing changes from Snowflake tables. Only one thread at a time can read from each table. For optimal performance this property should be equal to the number of tables.

  • EnableQuickCatchUp: when the reader is starting from the past, it tries to catch up to the current time by making frequent calls to the Snowflake endpoint. This behavior can have a high cost impact on Snowflake. If this property is set to true, the Snowflake endpoint is less frequently called, thereby reducing the cost, with increased performance and lower catch up time.

    This property is not exposed in the UI by default. You can add this property using the Add Property option under Snowflake Configuration.

  • YieldAfter: this property is beneficial when the ThreadPoolSize is less than the number of tables. In situations where some tables experience heavy traffic (one example is when a table is in a catchup phase while polling data from the past), tables with less traffic may not get a chance to process the polled results. This issue can be addressed by limiting each table's event processing to the count specified in YieldAfter. Once a table reaches its YieldAfter limit, the resources are yielded to the next table waiting in the queue.

    This property is not exposed in the UI by default. You can add this property using the Add Property option under Snowflake Configuration.

Start Timestamp

String

NOW

With the default value of NOW, reading will begin with data added after the application is started. Optionally specify a timestamp in the Snowflake instance's time zone to begin reading from an earlier point.

To set the timestamp for all tables in all schemas, use the format %.%:'<timestamp>'. You may set the timestamp for individual tables using the format <schema>.<table>:'<timestamp>' or for all tables in a schema using <schema>.<table>:'<timestamp>'. You may specify multiple timestamps by separating them with semicolons, for example, test_schema.table1:'<timestamp>'; public_schema.%:'<timestamp>'.

The Start Timestamp must be within the Time Travel range (by default 24 hours) or the application will halt. See Guides> Business Continuity & Data Recovery > Time Travel > Understanding & using Time Travel.

If a table has not had change tracking enabled (see Snowflake initial setup), Snowflake Reader will enable it and start reading that table from the time that change tracking was enabled rather than from the specified timestamp.

The supported time formats are:

YYYY-MM-DDThh:mmTZD
YYYY-MM-DDThh:mm:ssTZD
YYYY-MM-DDThh:mm:ss.sssTZD
  • YYYY = four-digit year

  • MM = two-digit month

  • DD = two-digit day

  • hh = two digits of hour

  • mm = two digits of minute

  • ss = two digits of second

  • sss = one to three digits representing a decimal fraction of a second

  • TZD = time zone designator (Z or -hh:mm or +hh:mm)

Tables

String

Specify the tables to be read in the format <schema>.<table>. (The database is specified as part of the Connection URL property in the connection profile.) Multiple tables can be specified using the % wildcard, for example, myschema.inc% to read all tables with names that start with inc, or myschema.% to read all tables in myschema, Specify multiple selections by separating them with semicolons, for example, schema1.%;schema2.%. Wildcards may not be used for schemas.

Do not modify this property when recovery is enabled for the application.

Reading from tables with the same name in different cases is not supported. Reading from a table having columns with the same name in different cases is not supported.

Snowflake Reader sample TQL

CREATE SOURCE SnowflakeReaderSource USING SnowflakeReader ( 
  connectionProfileName: 'admin.snowflakekeypair', 
  tables: 'PUBLIC.%', 
  excludedTables: 'PUBLIC.SAMPLETABLE1', 
  CDDLCapture: 'true') 
OUTPUT TO SnowflakeReaderOutputStream;

Snowflake Reader WAEvent fields

The output data type for Snowflake Reader is WAEvent. The elements are:

metadata: a map including:

  • TableName: the fully qualified name of the table on which the operation was performed

  • ChangeWindow_StartTimestamp: start time (using the Snowflake service's time zone) of the interval in which the data was committed in Snowflake

  • ColumnCount: the number of columns in the table when the change data was captured

  • Rowid: unique identifier for each event

  • OperationName: INSERT, UPDATE, DELETE, or the DML operation (such as AlterColumns)

  • EventType: DML or DDL

  • ChangeWindow_EndTimestamp: end time (using the Snowflake service's time zone) of the interval in which the data was committed in Snowflake

  • DatabaseName: the Snowflake database name

  • SourceName: value is always SNOWFLAKE

  • SourceNamespace: the Striim namespace of the Snowflake Reader instance

  • Timestamp: Striim system time when the event was received from Snowflake

data: an array of fields, numbered from 0, containing:

  • for an INSERT operation, the values that were inserted

  • for an UPDATE, the values after the operation was completed

  • for a DELETE, the value of the primary key and nulls for the other fields

To retrieve the values for these fields, use SELECT ... (DATA[]). See Parsing the fields of WAEvent for CDC readers.

before: for UPDATE operations, contains the primary key value from before the update. When an update changes the primary key value, you may retrieve the previous value using the BEFORE() function.

dataPresenceBitMap, beforePresenceBitMap, and typeUUID are reserved and should be ignored.

Snowflake Reader sample output

The following examples are based on this table:

CREATE TABLE AATHITHYA.TEST.SAMPLETABLE (
COL1 NUMBER PRIMARY KEY, 
COL2 FLOAT, 
COL3 VARCHAR, 
COL4 BINARY, 
C0L5 BOOLEAN, 
COL6 DATE, 
COL7 TIME, 
COL8 TIMESTAMP_LTZ, 
COL9 TIMESTAMP_NTZ, 
COL10 TIMESTAMP_TZ, 
COL11 OBJECT,
COL12 ARRAY,
COL13 VARIANT
);

For the following INSERT:

INSERT INTO AATHITHYA.TEST.SAMPLETABLE VALUES (1, 0.2, 'Striim', '48454C50', TRUE, '2002-03-05', 
'23:59:59.9999', '2020-03-12 01:02:03.123456789', '2020-03-12 01:02:03.123456789', '
2020-03-12 01:02:03.123456789 -0600', NULL, NULL, 1);

The output would be:

T1: WAEvent{
  data: [1,0.2,"Striim","48454C50",true,[ 2002, 3, 5 ],"23:59:59.9999","2020-03-12T01:02:03.123456789-07:00",
    "2020-03-12T01:02:03.123456789","2020-03-12T01:02:03.123456789-06:00",null,null,"1"]
  metadata: {
  "TableName" : "TEST.SAMPLETABLE",
  "ChangeWindow_StartTimestamp" : "2024-05-13T22:50:37.431-07:00",
  "ColumnCount" : 13,
  "RowId" : "74aa5e2e7039d0361c5923b41dae2bc8daced633",
  "OperationName" : "INSERT",
  "ChangeWindow_EndTimestamp" : "2024-05-13T22:50:47.431-07:00",
  "DatabaseName" : "AATHITHYA",
  "SourceName" : "SNOWFLAKE",
  "SourceNamespace" : "admin",
  "Timestamp" : 1715665848.535279000,
  "EventType":"DML"
  }
  userdata: null
  before: null
  dataPresenceBitMap: "fz8="
  beforePresenceBitMap: "AAA="
  typeUUID: {
  "uuidstring" : "01ef11b5-ddd9-4051-9835-3ee77256adfd"
  }
};

For the following UPDATE:

UPDATE AATHITHYA.TEST.SAMPLETABLE SET COL12 = ARRAY_CONSTRUCT(12, 'twelve') WHERE COL1 = 1;

The output would be:

T1: WAEvent{
  data: [1,0.2,"Striim","48454C50",true,[ 2002, 3, 5 ],"23:59:59.9999","2020-03-12T01:02:03.123456789-07:00",
    "2020-03-12T01:02:03.123456789","2020-03-12T01:02:03.123456789-06:00",null,"[\n  12,\n  \"twelve\"\n]","1"]
  metadata: {
  "TableName" : "TEST.SAMPLETABLE",
  "ChangeWindow_StartTimestamp" : "2024-05-13T22:52:47.431-07:00",
  "ColumnCount" : 13,
  "RowId" : "74aa5e2e7039d0361c5923b41dae2bc8daced633",
  "OperationName" : "UPDATE",
  "ChangeWindow_EndTimestamp" : "2024-05-13T22:52:57.431-07:00",
  "DatabaseName" : "AATHITHYA",
  "SourceName" : "SNOWFLAKE",
  "SourceNamespace" : "admin",
  "Timestamp" : 1715665978.893219000,
  "EventType":"DML"
  }
  userdata: null
  before: [ 1, 0.2, "Striim", "48454C50", true, [ 2002, 3, 5 ], "23:59:59.9999", 1584000123.123456789, 
    [ 2020, 3, 12, 1, 2, 3, 123456789 ], 1583996523.123456789, null, null, "1" ]
  dataPresenceBitMap: "fz8="
  beforePresenceBitMap: "fz8="
  typeUUID: {
  "uuidstring" : "01ef11b5-ddd9-4051-9835-3ee77256adfd"
  }
};

For the following DELETE:

DELETE FROM AATHITHYA.TEST.SAMPLETABLE WHERE COL1 = 1;

The output would be:

T1: WAEvent{
  data: [1,0.2,"Striim","48454C50",true,[ 2002, 3, 5 ],"23:59:59.9999","2020-03-12T01:02:03.123456789-07:00",
    "2020-03-12T01:02:03.123456789","2020-03-12T01:02:03.123456789-06:00",null,"[\n  12,\n  \"twelve\"\n]","1"]
  metadata: {
  "TableName" : "TEST.SAMPLETABLE",
  "ChangeWindow_StartTimestamp" : "2024-05-13T22:55:17.431-07:00",
  "ColumnCount" : 13,
  "RowId" : "74aa5e2e7039d0361c5923b41dae2bc8daced633",
  "OperationName" : "DELETE",
  "ChangeWindow_EndTimestamp" : "2024-05-13T22:55:27.431-07:00",
  "DatabaseName" : "AATHITHYA",
  "SourceName" : "SNOWFLAKE",
  "SourceNamespace" : "admin",
  "Timestamp" : 1715666128.735648000,
  "EventType":"DML"
  }
  userdata: null
  before: null
  dataPresenceBitMap: "fz8="
  beforePresenceBitMap: "AAA="
  typeUUID: {
  "uuidstring" : "01ef11b5-ddd9-4051-9835-3ee77256adfd"
  }
};

For the following DDL:

alter table AATHITHYA.TEST.SAMPLETABLE DROP COLUMN COL13;

The output would be:

T1: WAEvent{
  data: ["ALTER TABLE \"TEST\".\"SAMPLETABLE\" DROP COLUMN \"COL13\" ;"]
  metadata: {"CDDLMetadata":{"table":{"fullyQualifiedName":"\"TEST\".\"SAMPLETABLE\"","name":"\"SAMPLETABLE\"",
"dbEntityType":"Table","tableMappingId":null,"type":"TABLE","identifier":{"plainName":"\"TEST\".\"SAMPLETABLE\"",
"firstPart":null,"secondPart":{"plainName":"\"TEST\"","name":"\"TEST\"","metadata":{"metaEscape":null,
"multiWildcard":"%","singleWildcard":"_","bugEscape":"","encloser":"\"","backSlashEscapeSequence":"\\",
"databaseName":"","escapeSequence":"\\","case":"IGNORE_CASE"}},"thirdPart":{"plainName":"\"SAMPLETABLE\"",
"name":"\"SAMPLETABLE\"","metadata":{"metaEscape":null,"multiWildcard":"%","singleWildcard":"_",
"bugEscape":"","encloser":"\"","backSlashEscapeSequence":"\\","databaseName":"","escapeSequence":"\\",
"case":"IGNORE_CASE"}},"name":"\"TEST\".\"SAMPLETABLE\"","metadata":{"metaEscape":null,"multiWildcard":"%",
"singleWildcard":"_","bugEscape":"","encloser":"\"","backSlashEscapeSequence":"\\","databaseName":"",
"escapeSequence":"\\","case":"IGNORE_CASE"}},"schemaName":"\"TEST\"","databaseName":null,
"columns":[{"fullyQualifiedName":null,"name":"\"COL13\"","dbEntityType":"Column",
"identifier":{"plainName":"\"COL13\"","name":"\"COL13\"","metadata":{"metaEscape":null,
"multiWildcard":"%","singleWildcard":"_","bugEscape":"","encloser":"\"","backSlashEscapeSequence":"\\",
"databaseName":"snowflake","escapeSequence":"\\","case":"UPPER_CASE"}},"index":null,
"striimType":"null_STRIIM_UNKNOWN","sourceDataType":null,"targetDataType":null,
"targetDataTypeSyntax":null,"sourceDataLength":null,"targetDataLength":null,"sourcePrecision":null,
"targetPrecision":null,"sourceScale":null,"targetScale":null,"nullable":false,"unique":false,
"stringRepresentation":"Column{name='\"COL13\"', identifier=\"COL13\", index=null,
 striimType='null_STRIIM_UNKNOWN', sourceDataType='null', targetDataType='null', 
targetDataTypeSyntax='null', sourceDataLength=null, targetDataLength=null, sourcePrecision=null, 
targetPrecision=null, sourceScale=null, targetScale=null, nullable=false, unique=false}"}],
"foreignKeys":[],"pkColumns":[],"uqConstraints":{},"notNullConstraints":[],"recordCount":0,
"size":0,"sourceDDL":"ALTER TABLE \"TEST\".\"SAMPLETABLE\" DROP COLUMN \"COL13\" ;"},"violationList":[],
"finalCategory":"Green","sourceDbType":"snowflake","sourceDbSubType":null,"operation":"AlterColumns",
"subOperation":"DropColumn","exception":null,
"sql":"ALTER TABLE \"TEST\".\"SAMPLETABLE\" DROP COLUMN \"COL13\" ;","metaObject":{},
"objectPresenceMap":{"Table":true,"Column":true},"objectRequirementMap":{"Table":true,"Column":true},
"tableValid":true,"successful":true},"ChangeWindow_StartTimestamp":"2024-05-13T22:55:27.431-07:00",
"OperationName":"AlterColumns","EventType":"DDL","DatabaseName":"AATHITHYA","SourceName":"SNOWFLAKE",
"OperationSubName":"DropColumn","SourceNamespace":"admin","Timestamp":1715669128.735648000,
"TableName":"TEST.SAMPLETABLE","ChangeWindow_EndTimestamp":"2024-05-13T22:55:37.431-07:00",
"OperationType":"DDL","PREVIOUS_TYPE_UUID":{"uuidstring":"01ef11b5-ddd9-4051-9835-3ee77256adfd"}}
  userdata: null
  before: null
  dataPresenceBitMap: "AQ=="
  beforePresenceBitMap: "AA=="
  typeUUID: {"uuidstring":"01ef3775-09cf-a841-a1ea-0282642f9e1e"}
};

Snowflake Reader data type support and correspondence

See also Target data type support & mapping for Snowflake sources.

Note

If a table contains a column of a geospatial type (GEOGRAPHY or GEOMETRY), Snowflake Reader cannot capture UPDATE or DELETE operations. This is a limitation of Snowflake's Change Streams (see Guides > Data Loading > Streams > Introduction to Streams > Types of Streams and Compilation errors when select from a Stream : Invalid argument types for function 'EQUAL_NULL': (GEOMETRY, GEOMETRY).

Snowflake type

aliases

Striim type

notes

ARRAY

java.lang.String

BINARY

VARBINARY

java.lang.String

BOOLEAN

java.lang.Boolean

DATE

java.time.LocalDate

FLOAT

DOUBLE, DOUBLE PRECISION, FLOAT4, FLOAT8, REAL

java.lang.Double

GEOGRAPHY

java.lang.String

see note above

GEOMETRY

java.lang.String

see note above

NUMBER

BIGINT, BYTEINT, DECIMAL, INT, INTEGER, NUMERIC, SMALLINT, TINYINT

Java.lang.Integer

OBJECT

java.lang.String

TIME

java.time.String

TIMESTAMP_LTZ

java.time.ZonedDateTime

TIMESTAMP_NTZ

java.time.LocalDateTime

TIMESTAMP_TZ

java.time.ZonedDateTime

VARCHAR

CHAR, CHAR VARYING, CHARACTER, NCHAR, NCHAR VARYING, NVARCHAR, NVARCHAR2, STRING, TEXT

java.lang.String

VARIANT

java.lang.String