Reading a Kafka stream with an external Kafka consumer
To read a Kafka stream's persisted data in an external application:
Include
dataformat:'avro'in the stream's property set.Generate an Avro schema file for the stream using the console command
EXPORT <namespace>.<stream name>. This will create a schema file<namespace>_<stream name>_schema.avscin the Striim program directory. Optionally, you may specify a path or file name usingEXPORT <namespace>.<stream name> '<path>' '<file name>'. The .avsc extension will be added automatically.Copy the schema file to a location accessible by the external application.
In the external application, use the Zookeeper and broker addresses in the stream's property set, and reference the stream using
<namespace>_<stream name>.
Note
Recovery (see Recovering applications) is not supported for Avro-formatted Kafka streams.
For example, the following program would read from Samples.PosDataStream:
import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
public class SimpleKafkaConsumer
{
/**
* This method issues exactly one fetch request to a kafka topic/partition and prints out
* all the data from the response of the fetch request.
* @param topic_name Topic to fetch messages from
* @param schema_filename Avro schema file used for deserializing the messages
* @param host_name Host where the kafka broker is running
* @param port Port on which the kafka broker is listening
* @param clientId Unique id of a client doing the fetch request
* @throws Exception
*/
public void read(String topic_name, String schema_filename, String host_name, int port, String clientId) throws Exception
{
SimpleConsumer simpleConsumer = new SimpleConsumer(host_name, port, 100000, 64 * 1024, clientId);
// This is just an example to read from partition 1 of a topic.
int partitionId = 1;
// Finds the first offset in the logs and starts fetching messages from that offset.
long offset = getOffset(simpleConsumer, topic_name, partitionId, kafka.api.OffsetRequest.EarliestTime(), clientId);
// Builds a fetch request.
FetchRequestBuilder builder = new FetchRequestBuilder();
builder.clientId(clientId);
builder.addFetch(topic_name, partitionId, offset, 43264200);
FetchRequest fetchRequest = builder.build();
// Get the response of the fetch request.
FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);
// Instantiates an avro deserializer based on the schema file.
SpecificDatumReader datumReader = getAvroReader(schema_filename);
if (fetchResponse.hasError())
{
System.out.println("Error processing fetch request: Reason -> "+fetchResponse.errorCode(topic_name, 1));
}
else
{
ByteBufferMessageSet bbms = fetchResponse.messageSet(topic_name, partitionId);
int count = 0;
for (MessageAndOffset messageAndOffset : bbms)
{
ByteBuffer payload = messageAndOffset.message().payload();
{
// The message format is Striim specific and it looks like :
// 1. First 4 bytes represent an integer(k) which tells the size of the actual message in the byte buffer.
// 2. Next 'k' bytes stores the actual message.
// 3. This logic runs in a while loop until the byte buffer limit.
while (payload.hasRemaining())
{
int size = payload.getInt();
// if the size is invalid, or if there aren't enough bytes to process this chunk, then just bail.
if ((payload.position() + size > payload.limit()) || size == 0)
{
break;
}
else
{
byte[] current_bytes = new byte[size];
payload.get(current_bytes, 0, size);
BinaryDecoder recordDecoder = DecoderFactory.get().binaryDecoder(current_bytes, null);
GenericData.Record record = (GenericData.Record) datumReader.read(null, recordDecoder);
System.out.println(count++ +":"+record);
}
}
}
}
}
}
private SpecificDatumReader getAvroReader(String schema_filename) throws Exception
{
File schemaFile = new File(schema_filename);
if(schemaFile.exists() && !schemaFile.isDirectory())
{
Schema schema = new Schema.Parser().parse(schemaFile);
SpecificDatumReader avroReader = new SpecificDatumReader(schema);
return avroReader;
}
return null;
}
public long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName)
{
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError())
{
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
public static void main(String[] args)
{
SimpleKafkaConsumer readKafka = new SimpleKafkaConsumer();
String topic_name = "Samples_PosDataStream";
String schema_filename = "./Platform/conf/Samples_PosDataStream_schema.avsc";
try
{
readKafka.read(topic_name, schema_filename, "localhost", 9092, "ItsAUniqueClient");
} catch (Exception e)
{
System.out.println(e);
}
}
}