Skip to main content

Avro Formatter

Formats a writer's output for use by Apache Avro and generates an Avro schema file. Avro is a schema-based serialization utility that accepts schemas as input. For more information on compatible writers, see Supported writer-formatter combinations.

A common use case with AvroFormatter is to move data of type JSON from a file or Kafka source (or any other source which emits JSONNodeEvents) to Kafka topics as Avro Records. A record data type in Avro is a collection of multiple attributes.

Avro Formatter properties

property

type

default value

notes

Format As

String

default

Do not change default value unless Using the Confluent or Hortonworks schema registry.

Schema File Name

String

A string specifying the path and name of the Avro schema file Striim will create based on the type of the target's input stream. (Be sure Striim has write permission for the specified directory.) If no path is specified, the file will be created in the Striim program directory. To generate the schema file, deploy the application. Then compile the schema file as directed in the Avro documentation for use in your application.

Schema Registry Configuration

String

When using Confluent Cloud's schema registry, specify the required authentication properties in the format basic.auth.user.info=<value>,basic.auth.credentials.source=<value>. Otherwise, leave blank.

Schema Registry URL

String

Leave blank unless Using the Confluent or Hortonworks schema registry.

Schema Registry Subject Name

String

The name of the subject against which the formatted Avro record's schema will be registered in the schema registry. The name and namespace of the Avro records will be the same as the subject name.

This is a required property if you are using a message bus writer (such as Kafka, or EventHub) and if you are using the Avro schema registry to record the schema evolution.

There are two values that are accepted:

  • UseTopicName: all the Avro records will be registered under the topic name (configured in Kafka writer). If the topic has more than one type of AvroRecords in the topic, then the same subject will have multiple versions pointing to different types of records.

  • UseDynamicValues: each type of the record will have its own subject name and clearly shows the evolution of the type. The value of the subject name will be picked from the field specified in the “SchemaRegistrySubjectNameMapping” property.

Schema Registry Subject Name Mapping

String

If the Schema Registry Subject Name property was set to "UseDynamicValues" then this property is mandatory. The value can be one of the following types:

  • @metadata(<meta field name>)

    You can pick the subject name from the metadata map. For example: @metadata(Directory) or @metadata(TableName).

  • @userdata(<userdata field name>)

    You can pick the subject name from the userdata map. For example: @userdata(key1).

  • A static name enclosed within quotes. For example: "TestSubjectName".

  • An incoming field name if the incoming stream was a typed event. For example: EmpId.

Avro Formatter sample applications

The following sample application filters and parses part of PosApp's data and writes it to a file using AvroFormatter:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

CREATE TARGET AvroFileOut USING FileWriter(
  filename:'AvroTestOutput'
)
FORMAT USING AvroFormatter (
  schemaFileName:'AvroTestParsed.avsc'
)
INPUT FROM PosDataStream;

If you deploy the above application in the namespace avrons, AvroTestParsed.avsc is created with the following contents:

{"namespace": "PosDataStream_Type.avrons",
  "type" : "record",
  "name": "Typed_Record",
  "fields": [
{"name" : "merchantId", "type" : "string"},
{"name" : "dateTime", "type" : "string"},
{"name" : "hourValue", "type" : "int"},
{"name" : "amount", "type" : "double"},
{"name" : "zip", "type" : "string"}
 ]
}

The following application simply writes the raw data in WAEvent format:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE TARGET AvroFileOut USING FileWriter(
  filename:'AvroTestOutput'
)
FORMAT USING AvroFormatter (
  schemaFileName:'AvroTestRaw.avsc'
)
INPUT FROM CsvStream;

If you deploy the above application in the namespace avrons, AvroTestRaw.avsc is created with the following contents:

{"namespace": "WAEvent.avrons",
  "type" : "record",
  "name": "WAEvent_Record",
  "fields": [
    {"name" : "data",
    "type" : ["null" , { "type": "map","values":[ "null" , "string"] }]
     },
    {"name" : "before",
    "type" : ["null" , { "type": "map","values":[ "null" , "string"] }]
    },
    {"name" : "metadata",
     "type" : { "type": "map","values":"string" }
    }
  ]
}

See Parsing the data field of WAEvent and Using the META() function for information about this format.

For additional Avro Formatter examples, see Reading from and writing to Kafka using Avro.