Striim 3.10.1 documentation

Kafka Reader

Reads data from Apache Kafka 0.8, 0.9, 0.10, 0.11, or 2.1.

See Supported reader-parser combinations) for parsing options.

property

type

default value

notes

Auto Map Partition

Boolean

True

When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group.

When deploying on a Forwarding Agent, set to False.

Block Size

Integer

10240

for KafkaReader 0.8 only: size of the fetch buffer in KB, which must be greater than or equal to the Kafka broker's message.max.bytes size

Broker Address

String

Kafka Config

String

Optionally, specify Kafka producer properties, separated by semicolons. See Configuring Kafka for more information.

Set value.deserializer=com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer when each message contains one Avro record and it is not length-delimited.

When using KafkaReader 0.9 or later, the default is:

max.partition.fetch.bytes=10485760;
fetch.min.bytes=1048576;
fetch.max.wait.ms=1000;
receive.buffer.bytes=2000000;
poll.timeout.ms=10000

When using KafkaReader 0.8, the default is blank. We recommend setting:

retry.backoff.ms=10000

Kafka Config Property Separator

String

;

Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator

String

=

Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Partitiion ID List

String

partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions

Start Offset

Long

-1

With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition.

If you specify startOffset, leave startTimestamp at its default value.

Start Timestamp

String

for KafkaReader 0.10 and later only:

If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point.

If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone.

If you specify startTimestamp, leave startOffset at its default value.

Topic

String

Specify the Kafka version for the broker being read using VERSION '0.#.0': for example, to use 2.1, CREATE SOURCE <name> USING KafkaReader VERSION '2.1.0'. Striim's internal Kafka server (see Configuring Kafka) is version 0.11.

The output type is WAevent except when using Avro Parser  or JSONParser.

The following sample application will read the data written to Kafka by the Kafka Writer sample application and write it to striim/KR11Output.00:

CREATE SOURCE KR11Sample USING KafkaReader VERSION '0.11.0'(
  brokerAddress:'localhost:9092',
  topic:'KafkaWriterSample',
  startOffset:'0'
)
PARSE USING DSVParser ()
OUTPUT TO RawKafkaStream;

CREATE TARGET KR11Out USING FileWriter (
  filename:'KR11Output'
)
FORMAT USING DSVFormatter ()
INPUT FROM RawKafkaStream;