Striim 3.9.6 documentation

Kafka Reader

Reads data from Apache Kafka 0.8, 0.9, 0.10, 0.11, or 2.1.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

AutoMapPartition

java.lang.Boolean

True

When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group.

When deploying on a Forwarding Agent, set to False.

blocksize

java.lang.Integer

10240

for KafkaReader 0.8 only: size of the fetch buffer in KB, which must be greater than or equal to the Kafka broker's message.max.bytes size

brokerAddress

java.lang.String

KafkaConfig

java.lang.String

Optionally, specify Kafka producer properties, separated by semicolons. See Configuring Kafka for more information.

When using KafkaReader 0.9 or later, the default is:

max.partition.fetch.bytes=10485760;
fetch.min.bytes=1048576;
fetch.max.wait.ms=1000;
receive.buffer.bytes=2000000;
poll.timeout.ms=10000

When using KafkaReader 0.8, the default is blank. We recommend setting:

retry.backoff.ms=10000

KafkaConfigPropertySeparator

java.lang.String

;

Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

KafkaConfigValueSeparator

java.lang.String

=

Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

PartitiionIDList

java.lang.String

partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions

startOffset

java.lang.Long

-1

With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition.

If you specify startOffset, leave startTimestamp at its default value.

startTimestamp

java.lang.String

for KafkaReader 0.10 only:

If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point.

If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone.

If you specify startTimestamp, leave startOffset at its default value.

Topic

java.lang.String

There are four five of KafkaReader, 0.8.0, 0.9.0, 0.10.0, 0.11.0, and 2.1.0. Specify the one that corresponds to the Kafka broker being read using VERSION '0.#.0': for example, to use 2.1, CREATE SOURCE <name> USING KafkaReader VERSION '2.1.0'. Striim's internal Kafka server (see Configuring Kafka) is version 0.11.

The output type is WAevent except when using Avro Parser  or JSONParser.

The following sample application will read the data written to Kafka by the Kafka Writer sample application and write it to striim/KR11Output.00:

CREATE SOURCE KR11Sample USING KafkaReader VERSION '0.11.0'(
  brokerAddress:'localhost:9092',
  topic:'KafkaWriterSample',
  startOffset:'0'
)
PARSE USING DSVParser ()
OUTPUT TO RawKafkaStream;

CREATE TARGET KR11Out USING FileWriter (
  filename:'KR11Output'
)
FORMAT USING DSVFormatter ()
INPUT FROM RawKafkaStream;