Striim 3.9.6 documentation

Avro Parser

Parses input in Avro format. See Supported reader-parser combinations for compatible readers.

When reading .avro files, no properties are required. For non-file sources, specify one of the two properties.

property

type

default value

notes

schemaFileName

java.lang.String

the path and name of the Avro schema file

schemaRegistryURI

java.lang.String

the URI for a Confluent or Hortonworks schema registry, for example, http://198.51.100.55:8081

For detailed discussion of the schema registry, see Using the Confluent or Hortonworks schema registry.

The output type of a source using AvroParser is AvroEvent, which contains the elements of the metadata map (see Using the META() function) and the data array, and is of the type org.apache.avro.generic.GenericRecord.

The following application will generate an Avro schema file PosDataPreview.avsc and convert Samples/PosApp/appData/PosDataPreview.csv to an Avro data file PosDataPreview.avro:

CREATE APPLICATION WritePosData2Avro;
CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;
 
CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT
  TO_STRING(data[0]) as businessName,
  TO_STRING(data[1]) as merchantId,
  TO_STRING(data[2]) as primaryAccountNumber,
  TO_STRING(data[3]) as posDataCode,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_STRING(data[5])  as expDate,
  TO_STRING(data[6]) as currencyCode,
  TO_DOUBLE(data[7]) as authAmount,
  TO_STRING(data[8]) as terminalId,
  TO_STRING(data[9]) as zip,
  TO_STRING(data[10]) as city
FROM CsvStream;
 
CREATE TARGET AvroFileOut USING FileWriter(
  filename:'Samples.PosDataPreview.avro'
)
FORMAT USING AvroFormatter (
  schemaFileName:'Samples.PosDataPreview.avsc'
)
INPUT FROM PosDataStream;
END APPLICATION WritePosData2Avro;

See AVROFormatter for more information.

The following sample application uses the files created by WritePosData2Avro and writes a subset of the fields from PosDataPreview.avro to SysOut:

CREATE APPLICATION AvroParserTest;

CREATE SOURCE AvroSource USING FileReader (
  directory:'Samples',
  WildCard:'PosDataPreview.avro',
  positionByEOF:false
)
PARSE USING AvroParser (
  schemaFileName:"Samples/PosDataPreview.avsc"
)
OUTPUT TO AvroStream;

CREATE CQ parseAvroStream 
INSERT INTO ParsedAvroStream
SELECT  
-- conversion from org.apache.avro.util.Utf8 to String is required here
  data.get("merchantId").toString() as merchantId,
  TO_DATE(data.get("dateTime").toString()) as dateTime,
  TO_DOUBLE (data.get("authAmount")) as amount,
  data.get("zip").toString() as zip
FROM AvroStream; 
  
CREATE TARGET AvroOut
USING SysOut (name:Avro)
INPUT FROM ParsedAvroStream;

END APPLICATION AvroParserTest;

The following sample application will read from the Kafka topic created by the Oracle2Kafka sample application from Using the Confluent or Hortonworks schema registry.

CREATE APPLICATION ReadFromKafka RECOVERY 1 SECOND INTERVAL;
 
CREATE SOURCE Kafkasource USING KafkaReader VERSION '0.11.0'(
brokerAddress:'localhost:9092',
Topic:'test',
startOffset:0
)
PARSE USING AvroParser()
OUTPUT TO DataStream;
 
CREATE TYPE CompleteRecord(
    completedata com.fasterxml.jackson.databind.JsonNode
);
 
CREATE STREAM CompleteRecordInJSONStream OF CompleteRecord;
 
Create CQ AvroTOJSONCQ
    INSERT INTO CompleteRecordInJSONStream
        SELECT AvroToJson(y.data) FROM DataStream y;
 
CREATE TYPE ElementsOfNativeRecord(
    datarecord com.fasterxml.jackson.databind.JsonNode,
    before com.fasterxml.jackson.databind.JsonNode,
    metadata com.fasterxml.jackson.databind.JsonNode,
    userdata com.fasterxml.jackson.databind.JsonNode,
    datapresenceinfo com.fasterxml.jackson.databind.JsonNode,
    beforepresenceinfo com.fasterxml.jackson.databind.JsonNode
);
 
CREATE STREAM NativeRecordStream OF ElementsOfNativeRecord;
 
CREATE CQ GetNativeRecordInJSONCQ
INSERT INTO NativeRecordStream
SELECT
    completedata.get("data"),
    completedata.get("before"),
    completedata.get("metadata"),
    completedata.get("userdata"),
    completedata.get("datapresenceinfo"),
    completedata.get("beforepresenceinfo")
FROM CompleteRecordInJSONStream;
 
CREATE TAREGT bar using SysOut(name:'complete_record') input from CompleteRecordInJSONStream;
 
END APPLICATION ReadFromKafka;