Skip to main content

Parquet Formatter

Formats a writer's output for use by Apache Parquet and generates one or more schema files. See Supported writer-formatter combinations.

Notes:

  • Encryption Policy cannot be set for the associated writer.

  • Data written using Parquet Formatter cannot be consumed until the target file is closed (rolls over).

Configuring Microsoft Windows to run Parquet Formatter

When Striim or the Forwarding Agent is running on Microsoft Windows, you must perform the following steps before using Parquet Formatter:

  1. Download winutils.exe from https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin to a directory on the Windows system running Striim or the Forwarding Agent.

  2. Set the HADOOP_HOME environment variable to that directory.

  3. Add that directory to the PATH.

Parquet Formatter properties

property

type

default value

notes

Block Size

Long

128000000

Sets the parquet.block.size property in Parquet.

Compression Type

String

UNCOMPRESSED

Optionally, specify the target's compression format. Supported types are GZIP, LZO, and SNAPPY.

Format As

String

Default

With the Default setting, a single schema file is created.

  • With an input stream of a user-defined type, the output includes the name and value of each field.

  • With an input stream of type WAEvent (from any source), the output includes all contents of the event excluding dataPresenceBitMap, beforePresenceBitMap, and typeUUID.

The two other settings are supported only with an input stream of type WAEvent from a Database Reader, Incremental Batch Reader, or SQL CDC reader source. A dynamic directory, folder, or bucket name must be specified in the writer (see Setting output names and rollover / upload policies).

A schema file with a timestamp appended to its name is created in each directory, folder, or bucket. With S3 Writer, if both the bucket name and folder name are dynamic, each combination of bucket and folder will have its own schema file. If there is a DDL change in the source, a new schema file is created and the output file(s) rolls over.

  • When Format As is set to Native, the output includes all contents of the WAEvent except for typeUUID.

  • When Format As is set to Table, the output includes only the column names and values.

See Parquet Formatter examples for sample schema files and output for each setting.

Members

String

Optionally:

  • With an input stream of type WAEvent, specify a comma-separated list of elements to include in the output.

  • With an input stream of type WAEvent from a Database Reader, Incremental Batch Reader, or SQL CDC reader source, specify additional elements (for example, from the METADATA or USERDATA maps) to include in addition to the data array values. See Parquet Formatter examples for a sample schema file and output.

Schema File Name

String

The fully qualified name of the Parquet schema file Striim will create when the application runs. When a dynamic directory is specified in the writer, Striim in some cases writes the files in the target directories and/or appends a timestamp to the file names. See the notes for Format As for more details.

Parquet Formatter data type support and correspondence

The following apply when the input stream is of a user-defined type.

Striim type

Parquet type

Notes

Byte

INT32

DateTime

INT32

Unix epoch (number of days from 1 January 1970)

Double

DOUBLE

IEEE 64-bit

Float

FLOAT

IEEE 32-bit

Integer

INT32

32-bit signed

Long

INT64

64-bit signed

Short

INT32

String

BYTE_ARRAY

UTF-8

The following apply when the input stream is the output of a Database Reader or Incremental Batch Reader source.

JDBC column type

Parquet type

Notes

Types.BIGINT

INT32

32-bit signed

Types.BIT

INT32

Types.CHAR

BYTE_ARRAY

UTF-8

Types.DATE

INT32

Unix epoch (number of days from 1 January 1970)

Types.DECIMAL

BYTE_ARRAY

UTF-8

Types.DOUBLE

DOUBLE

IEEE 64-bit

Types.FLOAT

FLOAT

IEEE 32-bit

Types.INTEGER

INT32

32-bit signed

Types.NUMERIC

BYTE_ARRAY

UTF-8

Types.REAL

FLOAT

IEEE 32-bit

Types.SMALLINT

INT32

Unix epoch (number of days from 1 January 1970)

Types.TIMESTAMP

INT32

Types.TINYINT

INT32

Types.VARCHARCHAR

BYTE_ARRAY

UTF-8

other types

BYTE_ARRAY

UTF-8

The following apply when the input stream is the output of an Oracle Reader source.

Oracle type

Parquet type

ADT

unsupported

ARRAY

unsupported

BFILE

unsupported

BINARY_DOUBLE

DOUBLE

BINARY_FLOAT

FLOAT

BFILE

unsupported

BLOB

BYTE_ARRAY

CHAR

BYTE_ARRAY

CLOB

BYTE_ARRAY

DATE

BYTE_ARRAY

FLOAT

BYTE_ARRAY

INTERVALDAYTOSECOND

unsupported

INTERVALYEARTOMONTH

unsupported

LONG

unsupported

LONG RAW

unsupported

NCHAR

BYTE_ARRAY

NCLOB

BYTE_ARRAY

NESTED TABLE

unsupported

NUMBER

BYTE_ARRAY

NVARCHAR2

unsupported

RAW

unsupported

REF

unsupported

ROWID

unsupported

TIMESTAMP

BYTE_ARRAY

TIMESTAMP WITHLOCALTIMEZONE

BYTE_ARRAY

TIMESTAMP WITHTIMEZONE

BYTE_ARRAY

UDT

unsupported

UROWID

unsupported

VARCHAR2

BYTE_ARRAY

VARRAY

supported for primitive data types (see Oracle Reader and OJet WAEvent fields)

XMLTYPE

unsupported

Parquet Formatter examples

The output of Parquet Formatter varies depending on the type of the input stream and the Format As setting.

Input stream of user-defined type, Format As = Default

Input stream type:

Create Type PERSON (
  ID Integer,
  City String,
  Code String,
  Name String);

Schema:

message Person.Person {
  optional int32 ID;  
  optional binary city (UTF8);
  optional binary code (UTF8);
  optional binary name (UTF8);
}

Sample output:

{"ID":1216,"city":"South Kent","code":"USD","name":"COMPANY 4999992"}
Input stream of Oracle Reader WAEvent, Format As = Default

Source table:

CREATE TABLE TABLES1 (
  "EMPNO" NUMBER(4),
  "ENAME" VARCHAR2(10),
  "JOB" VARCHAR(20),
  "HIREDATE" TIMESTAMP(6),
  "SAL" NUMBER DEFAULT 0
);

Schema:

message WAEvent.avro.WAEvent {
  optional group metadata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group data (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group before (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group userdata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
}

Sample output, insert:

{
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "29"
      },
      {
        "key": "AuditSessionId",
        "value": "174320"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1215123"
      },
      {
        "key": "SQLRedoLength",
        "value": "152"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "1.20.571"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587561609000"
      },
      {
        "key": "COMMITSCN",
        "value": "1215124"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1215123"
      },
      {
        "key": "SegmentName",
        "value": "TABLES1"
      },
      {
        "key": "OperationName",
        "value": "INSERT"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-22T13:20:09.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "2808"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES1"
      },
      {
        "key": "TxnID",
        "value": "1.20.571"
      },
      {
        "key": "Serial",
        "value": "12481"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-22T13:20:09.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFSWAAEAAAApcAAB"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587561609000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "121512300000081627745086341280001"
      },
      {
        "key": "Session",
        "value": "142"
      }
    ]
  },
  "data": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "1"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-22T13:20:09.074-07:00"
      },
      {
        "key": "SAL",
        "value": "60000"
      }
    ]
  }

Sample output, update:

{
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "59"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1397647"
      },
      {
        "key": "SQLRedoLength",
        "value": "189"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "8.10.809"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x00003b.00013dfb.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587636418000"
      },
      {
        "key": "COMMITSCN",
        "value": "1397648"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1397647"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "UPDATE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T10:06:58.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "81403"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "8.10.809"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T10:06:58.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587636418000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "139764700000166070289607557280000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "data": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "6"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-23T10:05:03.381-07:00"
      },
      {
        "key": "SAL",
        "value": "70000"
      }
    ]
  },
  "before": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "6"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-23T10:05:03.381-07:00"
      },
      {
        "key": "SAL",
        "value": "60000"
      }
    ]
  }
}

Sample output, delete:

{
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "59"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1397672"
      },
      {
        "key": "SQLRedoLength",
        "value": "174"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "3.33.844"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x00003b.00013dfe.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587636481000"
      },
      {
        "key": "COMMITSCN",
        "value": "1397673"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1397672"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "DELETE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T10:08:01.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "81406"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "3.33.844"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T10:08:01.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587636481000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "139767200000166070289609523360000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "data": {
    "map": [
      {
        "key": "ENAME",
        "value": "tanuja"
      },
      {
        "key": "EMPNO",
        "value": "6"
      },
      {
        "key": "JOB",
        "value": "So"
      },
      {
        "key": "HIREDATE",
        "value": "2020-04-23T10:05:03.381-07:00"
      },
      {
        "key": "SAL",
        "value": "70000"
      }
    ]
  }
}
Input stream of Oracle Reader WAEvent, Format As = Native

Source table:

CREATE TABLE TABLES1 (
  "EMPNO" NUMBER(4),
  "ENAME" VARCHAR2(10),
  "JOB" VARCHAR(20),
  "HIREDATE" TIMESTAMP(6),
  "SAL" NUMBER DEFAULT 0
);

Schema:

message QATEST.TABLES3 {
  optional group data {
    optional binary EMPNO (UTF8);
    optional binary ENAME (UTF8);
    optional binary JOB (UTF8);
    optional binary HIREDATE (UTF8);
    optional binary SAL (UTF8);
  }
  optional group before {
    optional binary EMPNO (UTF8);
    optional binary ENAME (UTF8);
    optional binary JOB (UTF8);
    optional binary HIREDATE (UTF8);
    optional binary SAL (UTF8);
  }
  optional group metadata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group userdata (MAP) {
    repeated group map (MAP_KEY_VALUE) {
      required binary key (UTF8);
      optional binary value (UTF8);
    }
  }
  optional group datapresenceinfo {
    required boolean EMPNO;
    required boolean ENAME;
    required boolean JOB;
    required boolean HIREDATE;
    required boolean SAL;
  }
  optional group beforepresenceinfo {
    required boolean EMPNO;
    required boolean ENAME;
    required boolean JOB;
    required boolean HIREDATE;
    required boolean SAL;
  }
}

Sample output, insert:

{
  "data": {
    "EMPNO": "5",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:01:46.172-07:00",
    "SAL": "60000"
  },
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "52"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1353271"
      },
      {
        "key": "SQLRedoLength",
        "value": "152"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "2.31.778"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x000034.00006bf0.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587632506000"
      },
      {
        "key": "COMMITSCN",
        "value": "1353272"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1353271"
      },
      {
        "key": "SegmentName",
        "value": "TABLES3"
      },
      {
        "key": "OperationName",
        "value": "INSERT"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T09:01:46.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "27632"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES3"
      },
      {
        "key": "TxnID",
        "value": "2.31.778"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T09:01:46.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFpNAAEAAAAqdAAG"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587632506000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "135327100000146367005998448800006"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "datapresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  },
  "beforepresenceinfo": {
    "EMPNO": false,
    "ENAME": false,
    "JOB": false,
    "HIREDATE": false,
    "SAL": false
  }
}

Sample output, update:

{
  "data": {
    "EMPNO": "3",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:53:17.459-07:00",
    "SAL": "70000"
  },
  "before": {
    "EMPNO": "3",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:53:17.459-07:00",
    "SAL": "60000"
  },
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "55"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1366172"
      },
      {
        "key": "SQLRedoLength",
        "value": "189"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "5.30.788"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x000037.000128c2.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587635716000"
      },
      {
        "key": "COMMITSCN",
        "value": "1366173"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1366172"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "UPDATE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T09:55:16.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "75970"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "5.30.788"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T09:55:16.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587635716000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "136617200000154811286978560160000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "datapresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  },
  "beforepresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  }
}

Sample output, delete:

{
  "data": {
    "EMPNO": "3",
    "ENAME": "tanuja",
    "JOB": "So",
    "HIREDATE": "2020-04-23T09:53:17.459-07:00",
    "SAL": "70000"
  },
  "metadata": {
    "map": [
      {
        "key": "RbaSqn",
        "value": "55"
      },
      {
        "key": "AuditSessionId",
        "value": "174782"
      },
      {
        "key": "TableSpace",
        "value": "USERS"
      },
      {
        "key": "CURRENTSCN",
        "value": "1366251"
      },
      {
        "key": "SQLRedoLength",
        "value": "174"
      },
      {
        "key": "BytesProcessed"
      },
      {
        "key": "ParentTxnID",
        "value": "7.31.697"
      },
      {
        "key": "SessionInfo",
        "value": "UNKNOWN"
      },
      {
        "key": "RecordSetID",
        "value": " 0x000037.000128df.0010 "
      },
      {
        "key": "DBCommitTimestamp",
        "value": "1587635881000"
      },
      {
        "key": "COMMITSCN",
        "value": "1366252"
      },
      {
        "key": "SEQUENCE",
        "value": "1"
      },
      {
        "key": "Rollback",
        "value": "0"
      },
      {
        "key": "STARTSCN",
        "value": "1366251"
      },
      {
        "key": "SegmentName",
        "value": "TABLES4"
      },
      {
        "key": "OperationName",
        "value": "DELETE"
      },
      {
        "key": "TimeStamp",
        "value": "2020-04-23T09:58:01.000-07:00"
      },
      {
        "key": "TxnUserID",
        "value": "QATEST"
      },
      {
        "key": "RbaBlk",
        "value": "75999"
      },
      {
        "key": "SegmentType",
        "value": "TABLE"
      },
      {
        "key": "TableName",
        "value": "QATEST.TABLES4"
      },
      {
        "key": "TxnID",
        "value": "7.31.697"
      },
      {
        "key": "Serial",
        "value": "9289"
      },
      {
        "key": "ThreadID",
        "value": "1"
      },
      {
        "key": "COMMIT_TIMESTAMP",
        "value": "2020-04-23T09:58:01.000-07:00"
      },
      {
        "key": "OperationType",
        "value": "DML"
      },
      {
        "key": "ROWID",
        "value": "AAAFprAAEAAAAqlAAA"
      },
      {
        "key": "DBTimeStamp",
        "value": "1587635881000"
      },
      {
        "key": "TransactionName",
        "value": ""
      },
      {
        "key": "SCN",
        "value": "136625100000154811286997565600000"
      },
      {
        "key": "Session",
        "value": "9"
      }
    ]
  },
  "datapresenceinfo": {
    "EMPNO": true,
    "ENAME": true,
    "JOB": true,
    "HIREDATE": true,
    "SAL": true
  },
  "beforepresenceinfo": {
    "EMPNO": false,
    "ENAME": false,
    "JOB": false,
    "HIREDATE": false,
    "SAL": false
  }
}
Input stream of Oracle Reader WAEvent, Format As = Table

Source table:

CREATE TABLE TABLES1 (
  "EMPNO" NUMBER(4),
  "ENAME" VARCHAR2(10),
  "JOB" VARCHAR(20),
  "HIREDATE" TIMESTAMP(6),
  "SAL" NUMBER DEFAULT 0
);

Schema:

message QATEST.TABLES1 {
  optional binary EMPNO (UTF8);
  optional binary ENAME (UTF8);
  optional binary JOB (UTF8);
  optional binary HIREDATE (UTF8);
 optional binary SAL (UTF8);
}

Sample output, insert:

{"EMPNO":"1","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-22T13:30:25.432-07:00","SAL":"60000"}

Sample output, update:

{"EMPNO":"5","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-23T10:01:10.614-07:00","SAL":"70000"}

Sample output, delete:

{"EMPNO":"5","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-23T10:01:10.614-07:00","SAL":"70000"}
Input stream of Database Reader WAEvent, Format As = Table, using Members

Source table:

CREATE TABLE ORACLETOPARQUET1 (
  ID INTEGER,
  NAME VARCHAR(30),
  COST BINARY_FLOAT,
  CREATED_TS TIMESTAMP WITH TIME ZONE,
  LARGE_DATA CLOB
);

Members property value:

Table=@metadata(TableName),OpName=@metadata(OperationName)

Schema:

message QATEST.ORACLETOPARQUET1 {
  optional binary ID (UTF8);
  optional binary NAME (UTF8);
  optional float COST;
  optional binary CREATED_TS (UTF8);
  optional binary LARGE_DATA (UTF8);
  optional binary Table (UTF8);
  optional binary OpName (UTF8);
}

Sample output:

{"ID":"79","NAME":"abc","COST":39.5,"CREATED_TS":"2020-05-18T05:54:30.000Z",
"LARGE_DATA":"This is a character literal 1","Table":"QATEST.ORACLETOPARQUET1",
"OpName":"INSERT"}