Skip to main content

KafkaWriter output with AvroFormatter

In async mode, each Kafka message will contain a single event. The event will start with four bytes containing its length.

In sync mode, one message can contain multiple events. How many depends on the batch.size setting (see Setting KafkaWriter's mode property: sync versus async. Each event in the message will start with four bytes containing its length.

In sync mode, output will contain a nested record named __striimmetadata with a field position. With recovery on, this field will contain information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications). With recovery off, the value of this field will be null.

Schema example for input events of the user-defined Striim type

For input events of this user-defined Striim type:

Create Type PERSON (
  ID int,
  City String,
  Code String,
  Name String);

the schema would be:

{
    "type": "record",
    "name": "PERSON",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "ID",
        "type": ["null", "int"]
    }, {
        "name": "CITY",
        "type": ["null", "string"]
    }, {
        "name": "CODE",
        "type": ["null", "string"]
    }, {
        "name": "NAME",
        "type": ["null", "string"]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

Schema example for input events of type WAEvent

For input events of type WAEvent, the schema would be similar to:

{
  "namespace": "WAEvent.avro",
  "type": "record",
  "name": "WAEvent",
  "fields": [{
      "name": "metadata",
      "type": ["null",
        {"type": "map", "values": ["null", "string"] }
      ]
    },
    {
      "name": "data",
      "type": ["null",
        {"type": "map","values": ["null", "string"] }
      ]
    },
    {
      "name": "before",
      "type": ["null",
        {"type": "map", "values": ["null", "string"] }
      ]
    },
    {
      "name": "userdata",
      "type": ["null",
        {"type": "map", "values": ["null", "string"] }
      ]
    },
    {
      "name": "__striimmetadata",
      "type": {
        "type": "record",
        "name": "StriimMeta_Record",
        "fields": [
          {"name": "position", "type": ["null", "string"] }
        ]
      }
    }
  ]
}

and output would be similar to:

{
"data" : { "ID" : "1" , "NAME" : "User One" },
"before":{ "null" },
"metadata" : {
       "TABLENAME" : "Employee",
        "CommitTimestamp" : "12-Dec-2016 19:13:01",
         "OperationName" : "UPDATE"  },
"userdata":{ "null" },
"__striimmetadata" :  { "position" : SCN:1234002356" }
}