Striim 3.10.1 documentation

Cassandra Writer

Writes to tables in Apache Cassandra or DataStax. The Cassandra JDBC driver is included with Striim.

If writing to tables in Cosmos DB using the Cassandra API, use Cassandra Cosmos DB Writer.

property

type

default value

notes

Batch Policy

String

eventcount:1000, interval:60

The batch policy includes eventcount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to BatchPolicy:'-1'.

With the default setting, events will be sent every 60 seconds or sooner if the buffer accumulates 1,000 events.

Checkpoint Table

String

CHKPOINT

The table where DatabaseWriter will store recovery information when recovery is enabled. See Creating the checkpoint table below for DDL to create the table. Multiple instances of DatabaseWriter may share the same table. If the table is not in the Oracle or SQL/MX schema being written to, or the same MySQL or SQL Server database specified in the connection URL, specify a fully qualified name.

Column Name Escape Sequence

String

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, you may use this property to specify which characters Striim will use to escape column names that are on the List of reserved keywords. You may specify two characters to be added at the start and end of the name (for example, [] ), or one character to be added at both the start and end.

If this value is blank, Striim will use the following escape characters for the specified target databases:

  • Oracle: " (ASCII / UTF-8 22)

  • MySQL: ` (ASCII / UTF-8 60)

  • PostgreSQL: " (ASCII / UTF-8 22)

  • SQL Server: []

Commit Policy

String

eventcount:1000, interval:60

The commit policy controls how often transactions are committed in the target database. The syntax is the same as for BatchPolicy. CommitPolicy values must always be equal to or greater than BatchPolicy values. To disable CommitPolicy, set to CommitPolicy:'-1'.

If BatchPolicy is disabled, each event is sent to the target database immediately and the transactions are committed as specified by CommitPolicy.

If BatchPolicy is enabled and CommitPolicy is disabled, each batch is committed as soon as it is received by the target database.

If BatchPolicy and CommitPolicy are both disabled, each event received by DatabaseWriter will be committed immediately. This may be useful in development and testing, but is inappropriate for a production environment.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

Use the syntax jdbc:cassandra://<host name>:<port>/<keyspace>

See cassandra-jdbc-wrapper for additional options and more information.

Excluded Tables

String

When a wildcard is specified for Tables, you may specify here any tables you wish to exclude. Specify the value as for Tables. For example, to include data from HR_EMPLOYEES and HR_DEPTS but not from HRMASTER when writing to SQL Server (since you cannot specify a literal underscore in the Tables string):

Tables='HR%',
ExcludedTables='HRMASTER'

Ignorable Exception Code

String

By default, if the target DBMS returns an error, Cassandra Writer crashes the application. Use this property to specify errors to ignore, separated by commas. For example, to ignore "com.datastax.driver.core.exceptions.InvalidQueryException: PRIMARY KEY part id found in SET part," specify:

IgnorableExceptionCode: 'PRIMARY KEY'

When an ignorable exception occurs, Striim will write an "Ignoring VendorExceptionCode" message to the log, including the error number, and increment the "Number of exceptions ignored" value for the target.

To view the number of exceptions ignored in the web UI, go to the Monitor page, click the application name, click Targets, and click More Details next to the target.

For details of the individual ignored events, search striim.server.log for VendorExceptionCode (see Reading log files). Alternatively, to capture the ignored exceptions, Writing exceptions to a WActionStore.

When replicating from MySQL/MariaDB, Oracle 12c, PostgreSQL, and SQL Server CDC readers, the following three generic (that is, not corresponding to any database-specific error code) exceptions can be specified:

  • NO_OP_UPDATE: could not update a row in the target (typically because there was no corresponding primary key)

  • NO_OP_PKUPDATE: could not update the primary key of a row in the target (typically because the "before" primary key could not be found); not supported when source is PostgreSQLReader

  • NO_OP_DELETE: could not delete a row in the target (typically because there was no corresponding primary key)

These exceptions typically occur when other applications besides Striim are writing to the target database. The unwritten events will be captured to the application's exception store, if one exists (see CREATE EXCEPTIONSTORE).

Parallel Threads

Integer

See Creating multiple writer instances.

Enabling recovery for the application disables parallel threads.

Password

encrypted password

The password for the specified user. See Encrypted passwords.

Tables

String

Specify the name(s) of the table(s) to write to. Cassandra table names must be lowercase. The tables must exist in Cassandra.

Since columns in Cassandra tables are not usually created in the same order they are specified in the CREATE TABLE statement, when the input stream of the target is the output of a DatabaseReader or CDC source, the ColumnMap option is usually required (see Mapping columns). You may omit ColumnMap if you verify that the Cassandra columns are in the same order as the source columns.

If a specified target table does not exist, the application will crash with an error. To skip writes to missing tables without crashing, specify TABLE_NOT_FOUND as an Ignorable Exception Code.

When the target's input stream is a user-defined event, specify a single table.

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. Wildcards are not supported.

source.emp,target.emp
source.db1,target.db1;source.db2,target.db2

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

Username

String

The DBMS user name the adapter will use to log in to the server specified in Connection URL. The specified user must have MODIFY permission on the tables to be written to.

Vendor Configuration

String

Reserved.

Limitations:

  • If the Cassandra host goes offline, DatabaseWriter will log a "No host reachable" error and the application will crash.

  • Primary keys may not be updated (a limitation of Cassandra) and attempts to do so will crash the application. With DatabaseWriter and CDC sources, one way to avoid this is to filter out events where the PK_UPDATE metadata field is True. For example:

    CREATE CQ FilterPKUpdateCQ
    INSERT into CassandraStream
    SELECT * FROM CDCStream x WHERE META(x,PK_UPDATE) != True;

The sample application below assumes that you have created the following table in Cassandra:

CREATE TABLE mykeyspace.testtable (
  merchantid text PRIMARY KEY,
  datetime timestamp, 
  authamount decimal, 
  zip text;

The following TQL will write to that table:

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET CassandraTarget USING CassandraWriter(
  connectionurl: 'jdbc:cassandra://203.0.113.50:9042/mykeyspace',
  Username:'striim',
  Password:'******',
  Tables: 'mykeyspace.testtable'
)
INPUT FROM PosSource_TransformedStream;