Avro Parser
Parses input in Avro format. See Supported reader-parser combinations for compatible readers.
When reading .avro files, no properties are required. For non-file sources, specify one of the two properties.
Avro Parser properties
property | type | default value | notes |
---|---|---|---|
Schema File Name | String | the path and name of the Avro schema file | |
Schema Registry Configuration | String | When using Confluent Cloud's schema registry, specify the required authentication properties in the format | |
Schema Registry URI | String | the URI for a Confluent or Hortonworks schema registry, for example, |
For detailed discussion of the schema registry, see Using the Confluent or Hortonworks schema registry.
The output type of a source using AvroParser is AvroEvent, which contains the elements of the metadata
map (see Using the META() function) and the data
array, and is of the type org.apache.avro.generic.GenericRecord.
Avro Parser examples
You can download the following example TQL files as AvroParser.zip from https://github.com/striim/doc-downloads.
The following application will generate an Avro schema file PosDataPreview.avsc
and convert Samples/PosApp/appData/PosDataPreview.csv
to an Avro data file PosDataPreview.avro
.
CREATE APPLICATION WritePosData2Avro; CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream SELECT TO_STRING(data[0]) as businessName, TO_STRING(data[1]) as merchantId, TO_STRING(data[2]) as primaryAccountNumber, TO_STRING(data[3]) as posDataCode, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, TO_STRING(data[5]) as expDate, TO_STRING(data[6]) as currencyCode, TO_DOUBLE(data[7]) as authAmount, TO_STRING(data[8]) as terminalId, TO_STRING(data[9]) as zip, TO_STRING(data[10]) as city FROM CsvStream; CREATE TARGET AvroFileOut USING FileWriter( filename:'Samples.PosDataPreview.avro' ) FORMAT USING AvroFormatter ( schemaFileName:'Samples.PosDataPreview.avsc' ) INPUT FROM PosDataStream; END APPLICATION WritePosData2Avro;
See AVROFormatter for more information.
The following sample application uses the files created by WritePosData2Avro and writes a subset of the fields from PosDataPreview.avro
to SysOut:
CREATE APPLICATION AvroParserTest; CREATE SOURCE AvroSource USING FileReader ( directory:'Samples', WildCard:'PosDataPreview.avro', positionByEOF:false ) PARSE USING AvroParser ( schemaFileName:"Samples/PosDataPreview.avsc" ) OUTPUT TO AvroStream; CREATE CQ parseAvroStream INSERT INTO ParsedAvroStream SELECT -- conversion from org.apache.avro.util.Utf8 to String is required here data.get("merchantId").toString() as merchantId, TO_DATE(data.get("dateTime").toString()) as dateTime, TO_DOUBLE (data.get("authAmount")) as amount, data.get("zip").toString() as zip FROM AvroStream; CREATE TARGET AvroOut USING SysOut (name:Avro) INPUT FROM ParsedAvroStream; END APPLICATION AvroParserTest;
The following sample application will read from the Kafka topic created by the Oracle2Kafka sample application from Using the Confluent or Hortonworks schema registry.
CREATE APPLICATION ReadFromKafka RECOVERY 1 SECOND INTERVAL; CREATE SOURCE Kafkasource USING KafkaReader VERSION '0.11.0'( brokerAddress:'localhost:9092', Topic:'test', startOffset:0 ) PARSE USING AvroParser() OUTPUT TO DataStream; CREATE TYPE CompleteRecord( completedata com.fasterxml.jackson.databind.JsonNode ); CREATE STREAM CompleteRecordInJSONStream OF CompleteRecord; Create CQ AvroTOJSONCQ INSERT INTO CompleteRecordInJSONStream SELECT AvroToJson(y.data) FROM DataStream y; CREATE TYPE ElementsOfNativeRecord( datarecord com.fasterxml.jackson.databind.JsonNode, before com.fasterxml.jackson.databind.JsonNode, metadata com.fasterxml.jackson.databind.JsonNode, userdata com.fasterxml.jackson.databind.JsonNode, datapresenceinfo com.fasterxml.jackson.databind.JsonNode, beforepresenceinfo com.fasterxml.jackson.databind.JsonNode ); CREATE STREAM NativeRecordStream OF ElementsOfNativeRecord; CREATE CQ GetNativeRecordInJSONCQ INSERT INTO NativeRecordStream SELECT completedata.get("data"), completedata.get("before"), completedata.get("metadata"), completedata.get("userdata"), completedata.get("datapresenceinfo"), completedata.get("beforepresenceinfo") FROM CompleteRecordInJSONStream; CREATE TAREGT bar using SysOut(name:'complete_record') input from CompleteRecordInJSONStream; END APPLICATION ReadFromKafka;
For additional Avro Parser examples, see Reading from and writing to Kafka using Avro.