Striim 3.9.7 documentation

Notes on writing to Cassandra with DatabaseWriter

Requires Apache Cassandra 3.11.2. Starting with Striim 3.9.6, the Cassandra JDBC wrapper is included with the Striim server.

  • ConnectionURL: use the syntax

    jdbc:cassandra://<host name>:<port>/<keyspace>

    See cassandra-jdbc-wrapper for additional options and more information.

  • IgnorableExceptionCode: To ignore "com.datastax.driver.core.exceptions.InvalidQueryException: PRIMARY KEY part id found in SET part," specify

    IgnorableExceptionCode: 'PRIMARY KEY'
    
  • PreserveSourceTransactionBoundary: must be False (the default).

  • Tables: Cassandra table names must be lowercase. The tables must exist in Cassandra. Since columns in Cassandra tables are not usually created in the same order they are specified in the CREATE TABLE statement, when the input stream of the DatabaseWriter target is the output of a DatabaseReader or CDC source, the ColumnMap option is usually required (see Mapping columns) and wildcards are not supported. You may omit ColumnMap if you verify that the Cassandra columns are in the same order as the source columns.

  • Username: The specified user must have MODIFY permission on the tables to be written to.

  • When a Cassandra DatabaseWriter target's input stream is the output of a CDC source, the reader's Compression property must set to True.

  • If the Cassandra host goes offline, DatabaseWriter will log a "No host reachable" error and the application will crash.

  • Primary keys may not be updated (a limitation of Cassandra) and attempts to do so will crash the application. With DatabaseWriter and CDC sources, one way to avoid this is to filter out events where the PK_UPDATE metadata field is True. For example:

    CREATE CQ FilterPKUpdateCQ
    INSERT into CassandraStream
    SELECT * FROM CDCStream x WHERE META(x,PK_UPDATE) != True;

The sample application below assumes that you have created the following table in Cassandra:

CREATE TABLE mykeyspace.testtable (
  merchantid text PRIMARY KEY,
  datetime timestamp, 
  authamount decimal, 
  zip text;

The following TQL will write to that table:

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
    positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;
 
CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET CassandraTarget USING DatabaseWriter(
  connectionurl: 'jdbc:cassandra://203.0.113.50:9042/mykeyspace',
  Username:'striim',
  Password:'******',
  Tables: 'mykeyspace.testtable'
)
INPUT FROM PosSource_TransformedStream;

For an example using an input stream of type WAEvent, see Replicating Oracle data to Cassandra.