Striim 3.10.1 documentation

Kafka Writer

Writes to a topic in Apache Kafka.

There are five versions of KafkaWriter, 0.8.0,  0.9.0, 0.10.0, 0.11.0, and 2.1.0. Use the one that corresponds to the target Kafka broker. For example, to use 0.9.0, the syntax is CREATE TARGET <name> USING KafkaWriter VERSION '0.9.0'. If writing to the internal Kafka instance, use 0.11.0.

Known issue DEV-13039: application with KafkaWriter 0.9 or 0.10 crashes if Kafka broker goes offline.



default value


Broker Address


Kafka Config


Optionally, specify Kafka producer properties, separated by semicolons. See the table below for details.

Kafka Config Property Separator



Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator



Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Message Header


Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig batch.size=-1, specify one or more custom headers to be added to messages as key-value pairs. Values may be:

  • a field name from an in put stream of a user-defined type: for example, MerchantID=merchantID

  • a static string: for example, Company="My Company"

  • a function: for example, to get the source table name from a WAEvent input stream that is the output of a CDC reader, Table Name=@metadata(TableName)

To specify multiple custom headers, separate them with semicolons.

Message Key


Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig batch.size=-1, specify one or more keys to be added to messages as key-value pairs. The property value may be a static string, one or more fields from the input stream, or a combination of both. Examples:

MessageKey : CName=”Striim”

MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1)

MessageKey : CityName=City; Zipcode=zip

MessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName)

Among other possibilities, you may use this property to support log compaction or to allow downstream applications to use queries based on the message payload..




see Setting KafkaWriter's mode property: sync versus async

Parallel Threads


See Creating multiple writer instances.

Partition Key


The name of a field in the input stream whose values determine how events are distributed among multiple partitions. Events with the same partition key field value will be written to the same partition.

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see HP NonStop reader WAEvent fieldsMySQLReader WAEvent fieldsOracleReader WAEvent fields, or MS SQL Reader WAEvent fields) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>).



the existing Kafka topic to write to (will not be created if it does not exist)

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.Supported writer-formatter combinations

Notes on the KafkaConfig property

With the exceptions noted in the following table, you may specify any Kafka producer property in KafkaConfig. 

Kafka producer property



  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all



  • In sync mode, to prevent out-of-order events, the producer properties set in Kafka with will be unchanged and ignored, and Striim will handle these internally.

  • In async mode, Striim will update the Kafka producer properties and these will be handled by Kafka.

  • In sync mode, you may set batch.size=-1 to write one event per Kafka message. This will seriously degrade performance so is not recommended in a production environment. With this setting, messages will be similar to those in async mode.


When using version 2.1.0 and async mode, set to true to write events in order (see


value is always org.apache.kafka.common.serialization.ByteArrayDeserializer, cannot be overridden by KafkaConfig


The default value is com.striim.avro.deserializer.LengthDelimitedAvroRecordDeserializer. Set to com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer when each message contains one Avro record and it is not length-delimited.

Internally, KafkaWriter invokes KafkaConsumer for various purposes, and the WARNING from the consumer API due to passing KafkaConfig  properties can be safely ignored. See Configuring Kafka for more information about Kafka producer properties.

KafkaWriter sample application

The following sample code writes data from PosDataPreview.csv to the Kafka topic KafkaWriterSample. This topic already exists in Striim's internal Kafka instance. If you are using an external Kafka instance, you must create the topic before running the application.

CREATE SOURCE PosSource USING FileReader (
OUTPUT TO RawStream;

SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'(
INPUT FROM PosDataStream;

You can verify that data was written to Kafka by running the Kafka Reader sample application.

The first field in the output (position) stores information required to avoid lost or duplicate events after recovery (see Recovering applications). If recovery is not enabled, its value is NULL.

mon output (see Using the MON command) for targets using KafkaWriter includes:

  • in async mode only, Sent Bytes Rate: how many megabytes per second were sent to the brokers

  • in both sync and async mode, Write Bytes Rate: how many megabytes per second were written by the brokers and acknowledgement received by Striim

Enabling compression

When you enable compression in KafkaWriter, the broker and consumer should handle the compressed batches automatically. No additional configuration should be required in Kafka.

To enable batch compression for version 0.8.0, include the compression.codec property in KafkaConfig. Supported values are gzip and snappy. For example:


To enable compression for version 0.9, 0.10, or 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example: