Kafka Writer
Writes to a topic in Apache Kafka.
There are five versions of KafkaWriter, 0.8.0, 0.9.0, 0.10.0, 0.11.0, and 2.1.0. Use the one that corresponds to the target Kafka broker. For example, to use 0.9.0, the syntax is CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'
. If writing to the internal Kafka instance, use 0.11.0
.
Known issue DEV-13039: application with KafkaWriter 0.9 or 0.10 crashes if Kafka broker goes offline.
property | type | default value | notes |
---|---|---|---|
Broker Address | String | ||
Kafka Config | String | Optionally, specify Kafka producer properties, separated by semicolons. See the table below for details. | |
Kafka Config Property Separator | String | ; | Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon. |
Kafka Config Value Separator | String | = | Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol. |
Message Header | String | Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig
To specify multiple custom headers, separate them with semicolons. | |
Message Key | String | Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig MessageKey : CName=”Striim” MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1) MessageKey : CityName=City; Zipcode=zip MessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Among other possibilities, you may use this property to support log compaction or to allow downstream applications to use queries based on the message payload.. | |
Mode | String | Sync | |
Parallel Threads | Integer | ||
Partition Key | String | The name of a field in the input stream whose values determine how events are distributed among multiple partitions. Events with the same partition key field value will be written to the same partition. If the input stream is of any type except WAEvent, specify the name of one of its fields. If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fields, MySQLReader WAEvent fields, OracleReader WAEvent fields, or MS SQL Reader WAEvent fields) using the syntax | |
Topic | String | The existing Kafka topic to write to (will not be created if it does not exist). If more than one Kafka Writer writes to the same topic, recovery is not supported (see Recovering applications. (Recovery is supported when using Parallel Threads.) |
This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.
Notes on the KafkaConfig property
With the exceptions noted in the following table, you may specify any Kafka producer property in KafkaConfig.
Kafka producer property | notes |
---|---|
acks |
|
batch.size linger.ms retries |
|
enable.idempotence | When using version 2.1.0 and async mode, set to true to write events in order (see |
key.deserializer | value is always org.apache.kafka.common.serialization.ByteArrayDeserializer, cannot be overridden by KafkaConfig |
Internally, KafkaWriter invokes KafkaConsumer for various purposes, and the WARNING from the consumer API due to passing KafkaConfig properties can be safely ignored. See Configuring Kafka for more information about Kafka producer properties.
KafkaWriter sample application
The following sample code writes data from PosDataPreview.csv
to the Kafka topic KafkaWriterSample
. This topic already exists in Striim's internal Kafka instance. If you are using an external Kafka instance, you must create the topic before running the application.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:yes ) OUTPUT TO RawStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data[7]) as amount, TO_STRING(data[9]) as zip FROM RawStream; CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample' ) FORMAT USING DSVFormatter () INPUT FROM PosDataStream;
You can verify that data was written to Kafka by running the Kafka Reader sample application.
The first field in the output (position
) stores information required to avoid lost or duplicate events after recovery (see Recovering applications). If recovery is not enabled, its value is NULL.
mon
output (see Using the MON command) for targets using KafkaWriter includes:
in async mode only, Sent Bytes Rate: how many megabytes per second were sent to the brokers
in both sync and async mode, Write Bytes Rate: how many megabytes per second were written by the brokers and acknowledgement received by Striim
Enabling compression
When you enable compression in KafkaWriter, the broker and consumer should handle the compressed batches automatically. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
, lz4
, snappy
. For example:
KafkaConfig:'compression.type=snappy'