Parquet Formatter
Formats a writer's output for use by Apache Parquet and generates one or more schema files. See Supported writer-formatter combinations.
Notes:
Encryption Policy cannot be set for the associated writer.
Data written using Parquet Formatter cannot be consumed until the target file is closed (rolls over).
Configuring Microsoft Windows to run Parquet Formatter
When Striim or the Forwarding Agent is running on Microsoft Windows, you must perform the following steps before using Parquet Formatter:
Download winutils.exe from https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin to a directory on the Windows system running Striim or the Forwarding Agent.
Set the HADOOP_HOME environment variable to that directory.
Add that directory to the PATH.
Parquet Formatter properties
property | type | default value | notes |
---|---|---|---|
Block Size | Long | 128000000 | Sets the |
Compression Type | String | UNCOMPRESSED | Optionally, specify the target's compression format. Supported types are GZIP, LZO, and SNAPPY. |
Format As | String | Default | With the Default setting, a single schema file is created.
The two other settings are supported only with an input stream of type WAEvent from a Database Reader, Incremental Batch Reader, or SQL CDC reader source. A dynamic directory, folder, or bucket name must be specified in the writer (see Setting output names and rollover / upload policies). A schema file with a timestamp appended to its name is created in each directory, folder, or bucket. With S3 Writer, if both the bucket name and folder name are dynamic, each combination of bucket and folder will have its own schema file. If there is a DDL change in the source, a new schema file is created and the output file(s) rolls over.
See Parquet Formatter examples for sample schema files and output for each setting. |
Members | String | Optionally:
| |
Schema File Name | String | The fully qualified name of the Parquet schema file Striim will create when the application runs. When a dynamic directory is specified in the writer, Striim in some cases writes the files in the target directories and/or appends a timestamp to the file names. See the notes for Format As for more details. |
Parquet Formatter data type support and correspondence
The following apply when the input stream is of a user-defined type.
Striim type | Parquet type | Notes |
---|---|---|
Byte | INT32 | |
DateTime | INT32 | Unix epoch (number of days from 1 January 1970) |
Double | DOUBLE | IEEE 64-bit |
Float | FLOAT | IEEE 32-bit |
Integer | INT32 | 32-bit signed |
Long | INT64 | 64-bit signed |
Short | INT32 | |
String | BYTE_ARRAY | UTF-8 |
The following apply when the input stream is the output of a Database Reader or Incremental Batch Reader source.
JDBC column type | Parquet type | Notes |
---|---|---|
Types.BIGINT | INT32 | 32-bit signed |
Types.BIT | INT32 | |
Types.CHAR | BYTE_ARRAY | UTF-8 |
Types.DATE | INT32 | Unix epoch (number of days from 1 January 1970) |
Types.DECIMAL | BYTE_ARRAY | UTF-8 |
Types.DOUBLE | DOUBLE | IEEE 64-bit |
Types.FLOAT | FLOAT | IEEE 32-bit |
Types.INTEGER | INT32 | 32-bit signed |
Types.NUMERIC | BYTE_ARRAY | UTF-8 |
Types.REAL | FLOAT | IEEE 32-bit |
Types.SMALLINT | INT32 | Unix epoch (number of days from 1 January 1970) |
Types.TIMESTAMP | INT32 | |
Types.TINYINT | INT32 | |
Types.VARCHARCHAR | BYTE_ARRAY | UTF-8 |
other types | BYTE_ARRAY | UTF-8 |
The following apply when the input stream is the output of an Oracle Reader source.
Oracle type | Parquet type |
---|---|
ADT | unsupported |
ARRAY | unsupported |
BFILE | unsupported |
BINARY_DOUBLE | DOUBLE |
BINARY_FLOAT | FLOAT |
BFILE | unsupported |
BLOB | BYTE_ARRAY |
CHAR | BYTE_ARRAY |
CLOB | BYTE_ARRAY |
DATE | BYTE_ARRAY |
FLOAT | BYTE_ARRAY |
INTERVALDAYTOSECOND | unsupported |
INTERVALYEARTOMONTH | unsupported |
LONG | unsupported |
LONG RAW | unsupported |
NCHAR | BYTE_ARRAY |
NCLOB | BYTE_ARRAY |
NESTED TABLE | unsupported |
NUMBER | BYTE_ARRAY |
NVARCHAR2 | unsupported |
RAW | unsupported |
REF | unsupported |
ROWID | unsupported |
TIMESTAMP | BYTE_ARRAY |
TIMESTAMP WITHLOCALTIMEZONE | BYTE_ARRAY |
TIMESTAMP WITHTIMEZONE | BYTE_ARRAY |
UDT | unsupported |
UROWID | unsupported |
VARCHAR2 | BYTE_ARRAY |
VARRAY | supported for primitive data types (see Oracle Reader and OJet WAEvent fields) |
XMLTYPE | unsupported |
Parquet Formatter examples
The output of Parquet Formatter varies depending on the type of the input stream and the Format As setting.
Input stream of user-defined type, Format As = Default
Input stream type:
Create Type PERSON ( ID Integer, City String, Code String, Name String);
Schema:
message Person.Person { optional int32 ID; optional binary city (UTF8); optional binary code (UTF8); optional binary name (UTF8); }
Sample output:
{"ID":1216,"city":"South Kent","code":"USD","name":"COMPANY 4999992"}
Input stream of Oracle Reader WAEvent, Format As = Default
Source table:
CREATE TABLE TABLES1 ( "EMPNO" NUMBER(4), "ENAME" VARCHAR2(10), "JOB" VARCHAR(20), "HIREDATE" TIMESTAMP(6), "SAL" NUMBER DEFAULT 0 );
Schema:
message WAEvent.avro.WAEvent { optional group metadata (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); optional binary value (UTF8); } } optional group data (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); optional binary value (UTF8); } } optional group before (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); optional binary value (UTF8); } } optional group userdata (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); optional binary value (UTF8); } } }
Sample output, insert:
{ "metadata": { "map": [ { "key": "RbaSqn", "value": "29" }, { "key": "AuditSessionId", "value": "174320" }, { "key": "TableSpace", "value": "USERS" }, { "key": "CURRENTSCN", "value": "1215123" }, { "key": "SQLRedoLength", "value": "152" }, { "key": "BytesProcessed" }, { "key": "ParentTxnID", "value": "1.20.571" }, { "key": "SessionInfo", "value": "UNKNOWN" }, { "key": "RecordSetID", "value": " 0010 " }, { "key": "DBCommitTimestamp", "value": "1587561609000" }, { "key": "COMMITSCN", "value": "1215124" }, { "key": "SEQUENCE", "value": "1" }, { "key": "Rollback", "value": "0" }, { "key": "STARTSCN", "value": "1215123" }, { "key": "SegmentName", "value": "TABLES1" }, { "key": "OperationName", "value": "INSERT" }, { "key": "TimeStamp", "value": "2020-04-22T13:20:09.000-07:00" }, { "key": "TxnUserID", "value": "QATEST" }, { "key": "RbaBlk", "value": "2808" }, { "key": "SegmentType", "value": "TABLE" }, { "key": "TableName", "value": "QATEST.TABLES1" }, { "key": "TxnID", "value": "1.20.571" }, { "key": "Serial", "value": "12481" }, { "key": "ThreadID", "value": "1" }, { "key": "COMMIT_TIMESTAMP", "value": "2020-04-22T13:20:09.000-07:00" }, { "key": "OperationType", "value": "DML" }, { "key": "ROWID", "value": "AAAFSWAAEAAAApcAAB" }, { "key": "DBTimeStamp", "value": "1587561609000" }, { "key": "TransactionName", "value": "" }, { "key": "SCN", "value": "121512300000081627745086341280001" }, { "key": "Session", "value": "142" } ] }, "data": { "map": [ { "key": "ENAME", "value": "tanuja" }, { "key": "EMPNO", "value": "1" }, { "key": "JOB", "value": "So" }, { "key": "HIREDATE", "value": "2020-04-22T13:20:09.074-07:00" }, { "key": "SAL", "value": "60000" } ] }
Sample output, update:
{ "metadata": { "map": [ { "key": "RbaSqn", "value": "59" }, { "key": "AuditSessionId", "value": "174782" }, { "key": "TableSpace", "value": "USERS" }, { "key": "CURRENTSCN", "value": "1397647" }, { "key": "SQLRedoLength", "value": "189" }, { "key": "BytesProcessed" }, { "key": "ParentTxnID", "value": "8.10.809" }, { "key": "SessionInfo", "value": "UNKNOWN" }, { "key": "RecordSetID", "value": " 0x00003b.00013dfb.0010 " }, { "key": "DBCommitTimestamp", "value": "1587636418000" }, { "key": "COMMITSCN", "value": "1397648" }, { "key": "SEQUENCE", "value": "1" }, { "key": "Rollback", "value": "0" }, { "key": "STARTSCN", "value": "1397647" }, { "key": "SegmentName", "value": "TABLES4" }, { "key": "OperationName", "value": "UPDATE" }, { "key": "TimeStamp", "value": "2020-04-23T10:06:58.000-07:00" }, { "key": "TxnUserID", "value": "QATEST" }, { "key": "RbaBlk", "value": "81403" }, { "key": "SegmentType", "value": "TABLE" }, { "key": "TableName", "value": "QATEST.TABLES4" }, { "key": "TxnID", "value": "8.10.809" }, { "key": "Serial", "value": "9289" }, { "key": "ThreadID", "value": "1" }, { "key": "COMMIT_TIMESTAMP", "value": "2020-04-23T10:06:58.000-07:00" }, { "key": "OperationType", "value": "DML" }, { "key": "ROWID", "value": "AAAFprAAEAAAAqlAAA" }, { "key": "DBTimeStamp", "value": "1587636418000" }, { "key": "TransactionName", "value": "" }, { "key": "SCN", "value": "139764700000166070289607557280000" }, { "key": "Session", "value": "9" } ] }, "data": { "map": [ { "key": "ENAME", "value": "tanuja" }, { "key": "EMPNO", "value": "6" }, { "key": "JOB", "value": "So" }, { "key": "HIREDATE", "value": "2020-04-23T10:05:03.381-07:00" }, { "key": "SAL", "value": "70000" } ] }, "before": { "map": [ { "key": "ENAME", "value": "tanuja" }, { "key": "EMPNO", "value": "6" }, { "key": "JOB", "value": "So" }, { "key": "HIREDATE", "value": "2020-04-23T10:05:03.381-07:00" }, { "key": "SAL", "value": "60000" } ] } }
Sample output, delete:
{ "metadata": { "map": [ { "key": "RbaSqn", "value": "59" }, { "key": "AuditSessionId", "value": "174782" }, { "key": "TableSpace", "value": "USERS" }, { "key": "CURRENTSCN", "value": "1397672" }, { "key": "SQLRedoLength", "value": "174" }, { "key": "BytesProcessed" }, { "key": "ParentTxnID", "value": "3.33.844" }, { "key": "SessionInfo", "value": "UNKNOWN" }, { "key": "RecordSetID", "value": " 0x00003b.00013dfe.0010 " }, { "key": "DBCommitTimestamp", "value": "1587636481000" }, { "key": "COMMITSCN", "value": "1397673" }, { "key": "SEQUENCE", "value": "1" }, { "key": "Rollback", "value": "0" }, { "key": "STARTSCN", "value": "1397672" }, { "key": "SegmentName", "value": "TABLES4" }, { "key": "OperationName", "value": "DELETE" }, { "key": "TimeStamp", "value": "2020-04-23T10:08:01.000-07:00" }, { "key": "TxnUserID", "value": "QATEST" }, { "key": "RbaBlk", "value": "81406" }, { "key": "SegmentType", "value": "TABLE" }, { "key": "TableName", "value": "QATEST.TABLES4" }, { "key": "TxnID", "value": "3.33.844" }, { "key": "Serial", "value": "9289" }, { "key": "ThreadID", "value": "1" }, { "key": "COMMIT_TIMESTAMP", "value": "2020-04-23T10:08:01.000-07:00" }, { "key": "OperationType", "value": "DML" }, { "key": "ROWID", "value": "AAAFprAAEAAAAqlAAA" }, { "key": "DBTimeStamp", "value": "1587636481000" }, { "key": "TransactionName", "value": "" }, { "key": "SCN", "value": "139767200000166070289609523360000" }, { "key": "Session", "value": "9" } ] }, "data": { "map": [ { "key": "ENAME", "value": "tanuja" }, { "key": "EMPNO", "value": "6" }, { "key": "JOB", "value": "So" }, { "key": "HIREDATE", "value": "2020-04-23T10:05:03.381-07:00" }, { "key": "SAL", "value": "70000" } ] } }
Input stream of Oracle Reader WAEvent, Format As = Native
Source table:
CREATE TABLE TABLES1 ( "EMPNO" NUMBER(4), "ENAME" VARCHAR2(10), "JOB" VARCHAR(20), "HIREDATE" TIMESTAMP(6), "SAL" NUMBER DEFAULT 0 );
Schema:
message QATEST.TABLES3 { optional group data { optional binary EMPNO (UTF8); optional binary ENAME (UTF8); optional binary JOB (UTF8); optional binary HIREDATE (UTF8); optional binary SAL (UTF8); } optional group before { optional binary EMPNO (UTF8); optional binary ENAME (UTF8); optional binary JOB (UTF8); optional binary HIREDATE (UTF8); optional binary SAL (UTF8); } optional group metadata (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); optional binary value (UTF8); } } optional group userdata (MAP) { repeated group map (MAP_KEY_VALUE) { required binary key (UTF8); optional binary value (UTF8); } } optional group datapresenceinfo { required boolean EMPNO; required boolean ENAME; required boolean JOB; required boolean HIREDATE; required boolean SAL; } optional group beforepresenceinfo { required boolean EMPNO; required boolean ENAME; required boolean JOB; required boolean HIREDATE; required boolean SAL; } }
Sample output, insert:
{ "data": { "EMPNO": "5", "ENAME": "tanuja", "JOB": "So", "HIREDATE": "2020-04-23T09:01:46.172-07:00", "SAL": "60000" }, "metadata": { "map": [ { "key": "RbaSqn", "value": "52" }, { "key": "AuditSessionId", "value": "174782" }, { "key": "TableSpace", "value": "USERS" }, { "key": "CURRENTSCN", "value": "1353271" }, { "key": "SQLRedoLength", "value": "152" }, { "key": "BytesProcessed" }, { "key": "ParentTxnID", "value": "2.31.778" }, { "key": "SessionInfo", "value": "UNKNOWN" }, { "key": "RecordSetID", "value": " 0x000034.00006bf0.0010 " }, { "key": "DBCommitTimestamp", "value": "1587632506000" }, { "key": "COMMITSCN", "value": "1353272" }, { "key": "SEQUENCE", "value": "1" }, { "key": "Rollback", "value": "0" }, { "key": "STARTSCN", "value": "1353271" }, { "key": "SegmentName", "value": "TABLES3" }, { "key": "OperationName", "value": "INSERT" }, { "key": "TimeStamp", "value": "2020-04-23T09:01:46.000-07:00" }, { "key": "TxnUserID", "value": "QATEST" }, { "key": "RbaBlk", "value": "27632" }, { "key": "SegmentType", "value": "TABLE" }, { "key": "TableName", "value": "QATEST.TABLES3" }, { "key": "TxnID", "value": "2.31.778" }, { "key": "Serial", "value": "9289" }, { "key": "ThreadID", "value": "1" }, { "key": "COMMIT_TIMESTAMP", "value": "2020-04-23T09:01:46.000-07:00" }, { "key": "OperationType", "value": "DML" }, { "key": "ROWID", "value": "AAAFpNAAEAAAAqdAAG" }, { "key": "DBTimeStamp", "value": "1587632506000" }, { "key": "TransactionName", "value": "" }, { "key": "SCN", "value": "135327100000146367005998448800006" }, { "key": "Session", "value": "9" } ] }, "datapresenceinfo": { "EMPNO": true, "ENAME": true, "JOB": true, "HIREDATE": true, "SAL": true }, "beforepresenceinfo": { "EMPNO": false, "ENAME": false, "JOB": false, "HIREDATE": false, "SAL": false } }
Sample output, update:
{ "data": { "EMPNO": "3", "ENAME": "tanuja", "JOB": "So", "HIREDATE": "2020-04-23T09:53:17.459-07:00", "SAL": "70000" }, "before": { "EMPNO": "3", "ENAME": "tanuja", "JOB": "So", "HIREDATE": "2020-04-23T09:53:17.459-07:00", "SAL": "60000" }, "metadata": { "map": [ { "key": "RbaSqn", "value": "55" }, { "key": "AuditSessionId", "value": "174782" }, { "key": "TableSpace", "value": "USERS" }, { "key": "CURRENTSCN", "value": "1366172" }, { "key": "SQLRedoLength", "value": "189" }, { "key": "BytesProcessed" }, { "key": "ParentTxnID", "value": "5.30.788" }, { "key": "SessionInfo", "value": "UNKNOWN" }, { "key": "RecordSetID", "value": " 0x000037.000128c2.0010 " }, { "key": "DBCommitTimestamp", "value": "1587635716000" }, { "key": "COMMITSCN", "value": "1366173" }, { "key": "SEQUENCE", "value": "1" }, { "key": "Rollback", "value": "0" }, { "key": "STARTSCN", "value": "1366172" }, { "key": "SegmentName", "value": "TABLES4" }, { "key": "OperationName", "value": "UPDATE" }, { "key": "TimeStamp", "value": "2020-04-23T09:55:16.000-07:00" }, { "key": "TxnUserID", "value": "QATEST" }, { "key": "RbaBlk", "value": "75970" }, { "key": "SegmentType", "value": "TABLE" }, { "key": "TableName", "value": "QATEST.TABLES4" }, { "key": "TxnID", "value": "5.30.788" }, { "key": "Serial", "value": "9289" }, { "key": "ThreadID", "value": "1" }, { "key": "COMMIT_TIMESTAMP", "value": "2020-04-23T09:55:16.000-07:00" }, { "key": "OperationType", "value": "DML" }, { "key": "ROWID", "value": "AAAFprAAEAAAAqlAAA" }, { "key": "DBTimeStamp", "value": "1587635716000" }, { "key": "TransactionName", "value": "" }, { "key": "SCN", "value": "136617200000154811286978560160000" }, { "key": "Session", "value": "9" } ] }, "datapresenceinfo": { "EMPNO": true, "ENAME": true, "JOB": true, "HIREDATE": true, "SAL": true }, "beforepresenceinfo": { "EMPNO": true, "ENAME": true, "JOB": true, "HIREDATE": true, "SAL": true } }
Sample output, delete:
{ "data": { "EMPNO": "3", "ENAME": "tanuja", "JOB": "So", "HIREDATE": "2020-04-23T09:53:17.459-07:00", "SAL": "70000" }, "metadata": { "map": [ { "key": "RbaSqn", "value": "55" }, { "key": "AuditSessionId", "value": "174782" }, { "key": "TableSpace", "value": "USERS" }, { "key": "CURRENTSCN", "value": "1366251" }, { "key": "SQLRedoLength", "value": "174" }, { "key": "BytesProcessed" }, { "key": "ParentTxnID", "value": "7.31.697" }, { "key": "SessionInfo", "value": "UNKNOWN" }, { "key": "RecordSetID", "value": " 0x000037.000128df.0010 " }, { "key": "DBCommitTimestamp", "value": "1587635881000" }, { "key": "COMMITSCN", "value": "1366252" }, { "key": "SEQUENCE", "value": "1" }, { "key": "Rollback", "value": "0" }, { "key": "STARTSCN", "value": "1366251" }, { "key": "SegmentName", "value": "TABLES4" }, { "key": "OperationName", "value": "DELETE" }, { "key": "TimeStamp", "value": "2020-04-23T09:58:01.000-07:00" }, { "key": "TxnUserID", "value": "QATEST" }, { "key": "RbaBlk", "value": "75999" }, { "key": "SegmentType", "value": "TABLE" }, { "key": "TableName", "value": "QATEST.TABLES4" }, { "key": "TxnID", "value": "7.31.697" }, { "key": "Serial", "value": "9289" }, { "key": "ThreadID", "value": "1" }, { "key": "COMMIT_TIMESTAMP", "value": "2020-04-23T09:58:01.000-07:00" }, { "key": "OperationType", "value": "DML" }, { "key": "ROWID", "value": "AAAFprAAEAAAAqlAAA" }, { "key": "DBTimeStamp", "value": "1587635881000" }, { "key": "TransactionName", "value": "" }, { "key": "SCN", "value": "136625100000154811286997565600000" }, { "key": "Session", "value": "9" } ] }, "datapresenceinfo": { "EMPNO": true, "ENAME": true, "JOB": true, "HIREDATE": true, "SAL": true }, "beforepresenceinfo": { "EMPNO": false, "ENAME": false, "JOB": false, "HIREDATE": false, "SAL": false } }
Input stream of Oracle Reader WAEvent, Format As = Table
Source table:
CREATE TABLE TABLES1 ( "EMPNO" NUMBER(4), "ENAME" VARCHAR2(10), "JOB" VARCHAR(20), "HIREDATE" TIMESTAMP(6), "SAL" NUMBER DEFAULT 0 );
Schema:
message QATEST.TABLES1 { optional binary EMPNO (UTF8); optional binary ENAME (UTF8); optional binary JOB (UTF8); optional binary HIREDATE (UTF8); optional binary SAL (UTF8); }
Sample output, insert:
{"EMPNO":"1","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-22T13:30:25.432-07:00","SAL":"60000"}
Sample output, update:
{"EMPNO":"5","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-23T10:01:10.614-07:00","SAL":"70000"}
Sample output, delete:
{"EMPNO":"5","ENAME":"tanuja","JOB":"So","HIREDATE":"2020-04-23T10:01:10.614-07:00","SAL":"70000"}
Input stream of Database Reader WAEvent, Format As = Table, using Members
Source table:
CREATE TABLE ORACLETOPARQUET1 ( ID INTEGER, NAME VARCHAR(30), COST BINARY_FLOAT, CREATED_TS TIMESTAMP WITH TIME ZONE, LARGE_DATA CLOB );
Members property value:
Table=@metadata(TableName),OpName=@metadata(OperationName)
Schema:
message QATEST.ORACLETOPARQUET1 { optional binary ID (UTF8); optional binary NAME (UTF8); optional float COST; optional binary CREATED_TS (UTF8); optional binary LARGE_DATA (UTF8); optional binary Table (UTF8); optional binary OpName (UTF8); }
Sample output:
{"ID":"79","NAME":"abc","COST":39.5,"CREATED_TS":"2020-05-18T05:54:30.000Z", "LARGE_DATA":"This is a character literal 1","Table":"QATEST.ORACLETOPARQUET1", "OpName":"INSERT"}