Avro Formatter
Formats a writer's output for use by Apache Avro and generates an Avro schema file. Avro is a schema-based serialization utility that accepts schemas as input. For more information on compatible writers, see Supported writer-formatter combinations.
A common use case with AvroFormatter is to move data of type JSON from a file or Kafka source (or any other source which emits JSONNodeEvents) to Kafka topics as Avro Records. A record data type in Avro is a collection of multiple attributes.
Avro Formatter properties
property | type | default value | notes |
|---|---|---|---|
Avro Record Namespace | String | This property is used only with Kafka Writer. The Avro namespace is used to logically group related record types within an Avro schema. It helps uniquely identify a record, especially when multiple records share the same name but belong to different groups. If left empty the Avro record name will be used as the Avro record namespace. Optionally you may specify a static or dynamic value which will be the namespace. Values may be one of the following:
| |
Format As | String | default | Do not change default value unless using a schema registry (see Schema registry use cases and examples). |
Schema File Name | String | Appears in UI when Use Schema Registry Connection Profile is False. A string specifying the path and name of the Avro schema file Striim will create based on the type of the target's input stream. (Be sure Striim has write permission for the specified directory.) If no path is specified, the file will be created in the Striim program directory. To generate the schema file, deploy the application. Then compile the schema file as directed in the Avro documentation for use in your application. | |
Schema Registry Compatibility | Enum | NONE | This property is used only with Kafka Writer. See discussion below. |
Schema Registry Configuration | String | When using Confluent Cloud's schema registry, specify the required authentication properties in the format | |
Schema Registry Connection Profile | Enum | Appears in UI when Use Schema Registry Connection Profile is True. Select an existing connection profile or select New Connection Profile to create one. | |
Schema Registry URL | String | Appears in UI when Use Schema Registry Connection Profile is False. Leave blank unless using a schema registry (see Schema registry use cases and examples). | |
Schema Registry Subject Name | String | The name of the subject against which the formatted Avro record's schema will be registered in the schema registry. The name and namespace of the Avro records will be the same as the subject name. This is a required property if you are using a message bus writer (such as Kafka, or EventHub) and if you are using the Avro schema registry to record the schema evolution. There are two values that are accepted:
| |
Schema Registry Subject Name Mapping | String | If the Schema Registry Subject Name property was set to "UseDynamicValues" then this property is mandatory. The value can be one of the following types:
| |
Use Schema Registry Connection Profile | Boolean | False | Set to True to select or create a connection profile instead of specifying connection properties here. For more information, see Connection profiles. |
Avro Formatter's Schema Registry Compatibility property
This property is used only by Kafka Writer.
In distributed systems, components upgrade independently at different times. Schema compatibility ensures that older consumers can read new data and new consumers can read older data without errors. When a Kafka message is sent with a Schema Registry ID, compatibility checks are still required. The ID is a reference that allows consumers to find the correct schema for deserialization, but it does not guarantee that the schema itself is compatible with older or newer versions. The Schema Registry is responsible for enforcing compatibility rules, which must be configured separately from simply using the ID.
In a system with multiple producers and consumers, schema evolution is a normal occurrence, but an incompatible schema change can cause serious problems:
Data corruption: A consumer may fail to deserialize a message, potentially causing it to crash or corrupt data in a downstream system.
Pipeline failure: If a consumer cannot process a message (often called a "poison pill"), it can get stuck in an error loop and HALT processing for the entire topic.
Uncoordinated deployments: Without compatibility rules, producers and consumers cannot be upgraded independently. This introduces tight coupling and risk, as a change by one team could break another's application.
The various Schema Registry Compatibility settings can avoid or minimize these potential problems.
Schema Registry Compatibility setting | Description | Use case |
|---|---|---|
None | No compatibility checks are enforced. Any schema change is allowed regardless of compatibility with previous or future schemas. | Can be used when all producers and consumers are always upgraded together and no safety guarantees are needed. Example: A tightly controlled internal system where all services are deployed at the same time. This setting is generally not recommended in a production environment. |
Forward | A new schema is forward compatible if data written with the new schema can be read using the previous schema. | Useful when older consumers may still read messages after producers change the schema. Example: An e-commerce order service adds an optional giftWrap field; old inventory services can still read orders without errors. |
Forward Transitive | The new schema must be forward compatible with all previous versions (not just the latest one). | Can be used when some consumers are multiple versions behind. Example: In a banking system, the schema for transaction records evolves over time as new fields are added or modified. Some consumers, like a legacy fraud detection service, are several versions behind the latest schema. Despite the schema changes, forward transitive compatibility ensures these older consumers can still read and process all historical transactions safely.This allows the system to evolve without breaking older analytics or monitoring services that rely on past data. |
Backward | A new schema is backward compatible if data written with the old schema can be read using the new schema. | Useful when consumers are upgraded first. Example: In Smart warehouse sensors, older temperature and humidity sensors in a warehouse send data with fields sensorId, temperature, and humidity. A new warehouse monitoring system is deployed that expects additional fields like battery level and signal strength. Backward compatibility ensures the new monitoring system can safely read the older sensor data without errors, even though the new fields are absent. |
Backward Transitive | The new schema must be backward compatible with all previous versions, not just the most recent one. | Useful in large systems where multiple producers might still send data in old formats, while new consumers are deployed. It ensures that all old producer data is readable by the new consumers safely. Example: In a retail e-commerce platform, the inventory service has been running for years with older product schemas. A new recommendation engine is deployed to provide personalized suggestions. Backward transitive compatibility ensures the recommendation engine can read product data from all past inventory service versions without errors, even if some fields are missing or different in older messages. |
Full | The new schema must be both backward and forward compatible with the previous schema. | Use when both producers and consumers may evolve independently, and you want to ensure that any schema change does not break either side. Example: A smart thermostat network produces temperature readings to a Kafka topic. New firmware adds optional fields like humidity and airQualityIndex. Existing analytics dashboards (older consumers) still process the older temperature and timestamp fields without errors, while new AI-driven services consume the enhanced schema for predictive analytics. |
Full Transitive | The new schema must be both backward and forward compatible with all previous versions. | Use in systems with multiple producers and consumers running different schema versions over a long period, where any service may produce or consume at any time. Example: A connected vehicle fleet sends telemetry data (speed, location, fuelLevel) to Kafka. Over time, new fields are added (engineTemp, tirePressure, batteryHealth). Some vehicles run older firmware, and historical data is also replayed for AI models. Full transitive compatibility ensures that all consumers—legacy dashboards, fleet management systems, and AI models—can read all historical and new events safely. |
Avro Formatter sample applications
The following sample application filters and parses part of PosApp's data and writes it to a file using AvroFormatter:
CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue, TO_DOUBLE(data[7]) as amount, TO_STRING(data[9]) as zip FROM CsvStream; CREATE TARGET AvroFileOut USING FileWriter( filename:'AvroTestOutput' ) FORMAT USING AvroFormatter ( schemaFileName:'AvroTestParsed.avsc' ) INPUT FROM PosDataStream;
If you deploy the above application in the namespace avrons, AvroTestParsed.avsc is created with the following contents:
{"namespace": "PosDataStream_Type.avrons",
"type" : "record",
"name": "Typed_Record",
"fields": [
{"name" : "merchantId", "type" : "string"},
{"name" : "dateTime", "type" : "string"},
{"name" : "hourValue", "type" : "int"},
{"name" : "amount", "type" : "double"},
{"name" : "zip", "type" : "string"}
]
}The following application simply writes the raw data in WAEvent format:
CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE TARGET AvroFileOut USING FileWriter( filename:'AvroTestOutput' ) FORMAT USING AvroFormatter ( schemaFileName:'AvroTestRaw.avsc' ) INPUT FROM CsvStream;
If you deploy the above application in the namespace avrons, AvroTestRaw.avsc is created with the following contents:
{"namespace": "WAEvent.avrons",
"type" : "record",
"name": "WAEvent_Record",
"fields": [
{"name" : "data",
"type" : ["null" , { "type": "map","values":[ "null" , "string"] }]
},
{"name" : "before",
"type" : ["null" , { "type": "map","values":[ "null" , "string"] }]
},
{"name" : "metadata",
"type" : { "type": "map","values":"string" }
}
]
}See Parsing the data field of WAEvent and Using the META() function for information about this format.