Striim 3.9.7 documentation

Persisting a stream to Kafka

For an overview of this feature, see Introducing Kafka streams.

Note

Kafka and Zookeeper must be running when you create a Kafka-persisted stream, persist an existing stream, or import an application containing one.

CREATE STREAM <name> OF <type> PERSIST [USING <property set>];

To enable replay of a stream by persisting it to Kafka (see Replaying events using Kafka streams), use the syntax CREATE STREAM <name> OF <type> USING <namespace>.<property set>, where <property set> is the name of a set of Kafka server properties. To persist to a Striim cluster's integrated Kafka broker, use the property set Global.DefaultKafkaProperties, for example:

CREATE STREAM MyStream of MyStreamType PERSIST USING Global.DefaultKafkaProperties;

To persist to an external Kafka broker, instead of Global.DefaultKafkaProperties specify a custom property set created as described in Configuring Kafka.

This memory-resident stream may be used in the usual way in a window or CQ. Alternatively, the persisted data may be read by KafkaReader using topic name <namespace>_<stream name> (see Reading a Kafka stream with KafkaReader). To use persisted stream data from the integrated Kafka broker outside of Striim, see Reading a Kafka stream with an external Kafka consumer.

If a persisted stream is created in an application or flow with encryption enabled (see CREATE APPLICATION ... END APPLICATION) it will be encrypted. It may be read by another application without encryption enabled.

Limitations:

  • Kafka streams may be used only on the output of a source or the output of a CQ that parses a source.

  • Implicit streams may not be persisted to Kafka.

  • In an application or flow running in a Forwarding Agent, a source or CQ may output to a Kafka stream, but any further processing of that stream must take place on the Striim server.

  • If the Kafka broker configuration delete.topic.enable is false (the default for Kafka 0.11 and all other releases prior to 1.0.0), when you drop a Striim application with a Kafka stream after a crash, when you reload the application creating the stream will fail. To avoid this, set delete.topic.enable=true.

Thus the Kafka stream must be explicitly created before the source or CQ that populates it. Using MultiLogApp for example, to persist the raw output of the access log source:

CREATE STREAM RawAccessStream OF Global.WAEvent
  PERSIST USING Global.DefaultKafkaProperties;

CREATE SOURCE AccessLogSource USING FileReader (
  directory:'Samples/MultiLogApp/appData',
  wildcard:'access_log',
  positionByEOF:false
)
PARSE USING DSVParser (
  ignoreemptycolumn:'Yes',
  quoteset:'[]~"',
  separator:'~'
)
OUTPUT TO RawAccessStream;

Alternatively, to persist the output of the CQ that parses that raw output:

CREATE TYPE AccessLogEntry (
    srcIp String KEY ...
);
CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties;

CREATE CQ ParseAccessLog 
INSERT INTO AccessStream
SELECT data[0] ...
FROM RawAccessStream;

To distribute events among multiple Kafka partitions, use PARTITION BY <field>:

CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties
  PARTITION BY srcIp;

All events with the same value in <field> will be written to the same randomly selected Kafka partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the <field> values. For example, if 80% of the events have the same <field> value, then one of the Kafka partitions will contain 80% of the events.

By default, events may be distributed among up to 200 Kafka partitions. See Configuring Kafka for more information.

Dropping a persisted stream will automatically delete the associated Kafka topics.

If recovery (see Recovering applications) is enabled for an application containing a Kafka stream, the persisted data will include "CheckPoint" events used by the recovery process.