Kafka Reader
Reads data from Apache Kafka 0.8, 0.9, 0.10, 0.11, or 2.1.
See Supported reader-parser combinations) for parsing options.
property | type | default value | notes |
---|---|---|---|
Auto Map Partition | Boolean | True | When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group. When deploying on a Forwarding Agent, set to False. |
Block Size | Integer | 10240 | for KafkaReader 0.8 only: size of the fetch buffer in KB, which must be greater than or equal to the Kafka broker's message.max.bytes size |
Broker Address | String | ||
Kafka Config | String | Optionally, specify Kafka producer properties, separated by semicolons. See Configuring Kafka for more information. Set value.deserializer=com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer when each message contains one Avro record and it is not length-delimited. When using KafkaReader 0.9 or later, the default is: max.partition.fetch.bytes=10485760; fetch.min.bytes=1048576; fetch.max.wait.ms=1000; receive.buffer.bytes=2000000; poll.timeout.ms=10000 When using KafkaReader 0.8, the default is blank. We recommend setting: retry.backoff.ms=10000 | |
Kafka Config Property Separator | String | ; | Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon. |
Kafka Config Value Separator | String | = | Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol. |
Partitiion ID List | String | partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions | |
Start Offset | Long | -1 | With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition. If you specify startOffset, leave startTimestamp at its default value. |
Start Timestamp | String | for KafkaReader 0.10 and later only: If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point. If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone. If you specify startTimestamp, leave startOffset at its default value. | |
Topic | String |
Specify the Kafka version for the broker being read using VERSION '0.#.0'
: for example, to use 2.1, CREATE SOURCE <name> USING KafkaReader VERSION '2.1.0'
. Striim's internal Kafka server (see Configuring Kafka) is version 0.11.
The output type is WAevent except when using Avro Parser or JSONParser.
The following sample application will read the data written to Kafka by the Kafka Writer sample application and write it to striim/KR11Output.00
:
CREATE SOURCE KR11Sample USING KafkaReader VERSION '0.11.0'( brokerAddress:'localhost:9092', topic:'KafkaWriterSample', startOffset:'0' ) PARSE USING DSVParser () OUTPUT TO RawKafkaStream; CREATE TARGET KR11Out USING FileWriter ( filename:'KR11Output' ) FORMAT USING DSVFormatter () INPUT FROM RawKafkaStream;