Cassandra Writer
Writes to tables in Apache Cassandra or DataStax. The Cassandra JDBC driver is included with Striim.
If writing to tables in Cosmos DB using the Cassandra API, use Cassandra Cosmos DB Writer.
Limitations:
If the Cassandra host goes offline, DatabaseWriter will log a "No host reachable" error and the application will terminate.
Primary keys may not be updated (a limitation of Cassandra) and attempts to do so will terminate the application. With DatabaseWriter and CDC sources, one way to avoid this is to filter out events where the PK_UPDATE metadata field is True. For example:
CREATE CQ FilterPKUpdateCQ INSERT into CassandraStream SELECT * FROM CDCStream x WHERE META(x,PK_UPDATE) != True;
Cassandra Writer properties
property | type | default value | notes |
---|---|---|---|
Batch Policy | String | eventcount:1000, interval:60 | The batch policy includes eventcount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to With the default setting, events will be sent every 60 seconds or sooner if the buffer accumulates 1,000 events. |
Checkpoint Table | String | CHKPOINT | The table where DatabaseWriter will store recovery information when recovery is enabled. See Creating the checkpoint table below for DDL to create the table. Multiple instances of DatabaseWriter may share the same table. If the table is not in the Oracle or SQL/MX schema being written to, or the same MySQL or SQL Server database specified in the connection URL, specify a fully qualified name. |
Column Name Escape Sequence | String | When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, you may use this property to specify which characters Striim will use to escape column names that are on the List of reserved keywords. You may specify two characters to be added at the start and end of the name (for example, If this value is blank, Striim will use the following escape characters for the specified target databases:
| |
Commit Policy | String | eventcount:1000, interval:60 | The commit policy controls how often transactions are committed in the target database. The syntax is the same as for BatchPolicy. CommitPolicy values must always be equal to or greater than BatchPolicy values. To disable CommitPolicy, set to If BatchPolicy is disabled, each event is sent to the target database immediately and the transactions are committed as specified by CommitPolicy. If BatchPolicy is enabled and CommitPolicy is disabled, each batch is committed as soon as it is received by the target database. If BatchPolicy and CommitPolicy are both disabled, each event received by DatabaseWriter will be committed immediately. This may be useful in development and testing, but is inappropriate for a production environment. |
Connection Retry Policy | String | retryInterval=30, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connection URL | String | Use the syntax See cassandra-jdbc-wrapper for additional options and more information. | |
Excluded Tables | String | If | |
Ignorable Exception Code | String | By default, if the target DBMS returns an error, Cassandra Writer terminates the application. Use this property to specify errors to ignore, separated by commas. For example, to ignore "com.datastax.driver.core.exceptions.InvalidQueryException: PRIMARY KEY part id found in SET part," specify: IgnorableExceptionCode: 'PRIMARY KEY' When an ignorable exception occurs, Striim will write an "Ignoring VendorExceptionCode" message to the log, including the error number, and increment the "Number of exceptions ignored" value for the target. To view the number of exceptions ignored in the web UI, go to the Monitor page, click the application name, click Targets, and click More Details next to the target. Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE). When replicating from MySQL/MariaDB, Oracle 12c, PostgreSQL, and SQL Server CDC readers, the following three generic (that is, not corresponding to any database-specific error code) exceptions can be specified:
These exceptions typically occur when other applications besides Striim are writing to the target database. The unwritten events will be captured to the application's exception store, if one exists (see CREATE EXCEPTIONSTORE). | |
Parallel Threads | Integer | See Creating multiple writer instances. Enabling recovery for the application disables parallel threads. | |
Password | encrypted password | The password for the specified user. See Encrypted passwords. | |
Tables | String | Specify the name(s) of the table(s) to write to. Cassandra table names must be lowercase. The tables must exist in Cassandra. Since columns in Cassandra tables are not usually created in the same order they are specified in the CREATE TABLE statement, when the input stream of the target is the output of a DatabaseReader or CDC source, the ColumnMap option is usually required (see Mapping columns). You may omit ColumnMap if you verify that the Cassandra columns are in the same order as the source columns. If a specified target table does not exist, the application will terminate with an error. To skip writes to missing tables without terminating, specify TABLE_NOT_FOUND as an Ignorable Exception Code. When the target's input stream is a user-defined event, specify a single table. When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use the source.emp,target.emp source.db1,target.db1;source.db2,target.db2 source.%,target.% source.mydatabase.emp%,target.mydb.% source1.%,target1.%;source2.%,target2.% MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as | |
Username | String | The DBMS user name the adapter will use to log in to the server specified in Connection URL. The specified user must have MODIFY permission on the tables to be written to. | |
Vendor Configuration | String | Reserved. |
Cassandra Writer sample application
The sample application below assumes that you have created the following table in Cassandra:
CREATE TABLE mykeyspace.testtable ( merchantid text PRIMARY KEY, datetime timestamp, authamount decimal, zip text;
The following TQL will write to that table:
CREATE SOURCE PosSource USING FileReader ( wildcard: 'PosDataPreview.csv', directory: 'Samples/PosApp/appData', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO PosSource_Stream; CREATE CQ PosSource_Stream_CQ INSERT INTO PosSource_TransformedStream SELECT TO_STRING(data[1]) AS MerchantId, TO_DATE(data[4]) AS DateTime, TO_DOUBLE(data[7]) AS AuthAmount, TO_STRING(data[9]) AS Zip FROM PosSource_Stream; CREATE TARGET CassandraTarget USING CassandraWriter( connectionurl: 'jdbc:cassandra://203.0.113.50:9042/mykeyspace', Username:'striim', Password:'******', Tables: 'mykeyspace.testtable' ) INPUT FROM PosSource_TransformedStream;