Avro Parser
Parses input in Avro format. See Supported reader-parser combinations for compatible readers.
When reading .avro files, no properties are required. For non-file sources, specify one of the two properties.
Avro Parser properties
property | type | default value | notes |
|---|---|---|---|
Schema File Name | String | the path and name of the Avro schema file | |
Schema Registry Configuration | String | When using Confluent Cloud's schema registry, specify the required authentication properties in the format | |
Schema Registry URI | String | the URI for a Confluent or Hortonworks schema registry, for example, |
For detailed discussion of the schema registry, see Using the Confluent schema registry.
The output type of a source using AvroParser is AvroEvent, which contains the elements of the metadata map (see Using the META() function) and the data array, and is of the type org.apache.avro.generic.GenericRecord.
Avro Parser examples
You can download the following example TQL files as AvroParser.zip from https://github.com/striim/doc-downloads.
The following application will generate an Avro schema file PosDataPreview.avsc and convert Samples/PosApp/appData/PosDataPreview.csv to an Avro data file PosDataPreview.avro.
CREATE APPLICATION WritePosData2Avro; CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream SELECT TO_STRING(data[0]) as businessName, TO_STRING(data[1]) as merchantId, TO_STRING(data[2]) as primaryAccountNumber, TO_STRING(data[3]) as posDataCode, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, TO_STRING(data[5]) as expDate, TO_STRING(data[6]) as currencyCode, TO_DOUBLE(data[7]) as authAmount, TO_STRING(data[8]) as terminalId, TO_STRING(data[9]) as zip, TO_STRING(data[10]) as city FROM CsvStream; CREATE TARGET AvroFileOut USING FileWriter( filename:'Samples.PosDataPreview.avro' ) FORMAT USING AvroFormatter ( schemaFileName:'Samples.PosDataPreview.avsc' ) INPUT FROM PosDataStream; END APPLICATION WritePosData2Avro;
See AVROFormatter for more information.
The following sample application uses the files created by WritePosData2Avro and writes a subset of the fields from PosDataPreview.avro to SysOut:
CREATE APPLICATION AvroParserTest;
CREATE SOURCE AvroSource USING FileReader (
directory:'Samples',
WildCard:'PosDataPreview.avro',
positionByEOF:false
)
PARSE USING AvroParser (
schemaFileName:"Samples/PosDataPreview.avsc"
)
OUTPUT TO AvroStream;
CREATE CQ parseAvroStream
INSERT INTO ParsedAvroStream
SELECT
-- conversion from org.apache.avro.util.Utf8 to String is required here
data.get("merchantId").toString() as merchantId,
TO_DATE(data.get("dateTime").toString()) as dateTime,
TO_DOUBLE (data.get("authAmount")) as amount,
data.get("zip").toString() as zip
FROM AvroStream;
CREATE TARGET AvroOut
USING SysOut (name:Avro)
INPUT FROM ParsedAvroStream;
END APPLICATION AvroParserTest;The following sample application will read from the Kafka topic created by the Oracle2Kafka sample application from Using the Confluent schema registry.
CREATE APPLICATION ReadFromKafka RECOVERY 1 SECOND INTERVAL;
CREATE SOURCE Kafkasource USING KafkaReader VERSION '0.11.0'(
brokerAddress:'localhost:9092',
Topic:'test',
startOffset:0
)
PARSE USING AvroParser()
OUTPUT TO DataStream;
CREATE TYPE CompleteRecord(
completedata com.fasterxml.jackson.databind.JsonNode
);
CREATE STREAM CompleteRecordInJSONStream OF CompleteRecord;
Create CQ AvroTOJSONCQ
INSERT INTO CompleteRecordInJSONStream
SELECT AvroToJson(y.data) FROM DataStream y;
CREATE TYPE ElementsOfNativeRecord(
datarecord com.fasterxml.jackson.databind.JsonNode,
before com.fasterxml.jackson.databind.JsonNode,
metadata com.fasterxml.jackson.databind.JsonNode,
userdata com.fasterxml.jackson.databind.JsonNode,
datapresenceinfo com.fasterxml.jackson.databind.JsonNode,
beforepresenceinfo com.fasterxml.jackson.databind.JsonNode
);
CREATE STREAM NativeRecordStream OF ElementsOfNativeRecord;
CREATE CQ GetNativeRecordInJSONCQ
INSERT INTO NativeRecordStream
SELECT
completedata.get("data"),
completedata.get("before"),
completedata.get("metadata"),
completedata.get("userdata"),
completedata.get("datapresenceinfo"),
completedata.get("beforepresenceinfo")
FROM CompleteRecordInJSONStream;
CREATE TAREGT bar using SysOut(name:'complete_record') input from CompleteRecordInJSONStream;
END APPLICATION ReadFromKafka;For additional Avro Parser examples, see Reading from and writing to Kafka using Avro.