Skip to main content

Parsing the fields of WAEvent for CDC readers

Use the following functions to parse the output stream of a CDC reader.

DATA[#] function

DATA [field_number]

For each event, returns the value of the specified field from the data array.

  • The first field in the array is 0.

  • The order of the DATA functions in the SELECT clause determines the order of the fields in the output. These may be specified in any order: for example, data[1] could precede data[0].

DATA(x) and DATAORDERED(x) functions

For each event, returns the values in the WAEvent data array as a java.util.HashMap, with column names as the keys. DATAORDERED(x) returns the column values in the same order as in the source table. When using DATA(x), the order is not guaranteed.

The following example shows how to use the DATA() function to include database column names from a CDC source in JSONFormatter output. (You could do the same thing with AVROFormatter.)

CREATE SOURCE DBROracleIn USING DatabaseReader (
  Username:'striidbr',
  Password:'passwd',
  ConnectionURL:'jdbc:oracle:thin:@192.0.2.49:1521/orcl',
  Tables:'ROBERT.POSAUTHORIZATIONS',
  FetchSize:1
) 
OUTPUT TO OracleRawStream;

CREATE TYPE OpTableDataType(
  TableName String,
  data java.util.HashMap
);
CREATE STREAM OracleTypedStream OF OpTableDataType;
CREATE CQ ParseOracleRawStream
  INSERT INTO OracleTypedStream
  SELECT META(OracleRawStream, "TableName").toString(),
    DATA(OracleRawStream)
  FROM OracleRawStream;
 
CREATE TARGET DBR2JSONOut USING FileWriter(
  filename:'DBR2JSON.json'
)
FORMAT USING JSONFormatter ()
INPUT FROM OracleTypedStream;

The CQ will be easier to read if you use an alias for the stream name. For example:

CREATE CQ ParseOracleRawStream
  INSERT INTO OracleTypedStream
  SELECT META(x, "TableName").toString(),
    DATA(x)
  FROM OracleRawStream x;

Assuming the following Oracle table and data:

CREATE TABLE POSAUTHORIZATIONS (
  BUSINESS_NAME varchar2(30),
  MERCHANT_ID varchar2(100),
  PRIMARY_ACCOUNT NUMBER,
  POS NUMBER,CODE varchar2(20),
  EXP char(4),
  CURRENCY_CODE char(3),
  AUTH_AMOUNT number(10,3),
  TERMINAL_ID NUMBER,
  ZIP number,
  CITY varchar2(20));
commit;
INSERT INTO POSAUTHORIZATIONS VALUES(
  'COMPANY 1',
  'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu',
  6705362103919221351,
  0,
  '20130309113025',
  '0916',
  'USD',
  2.20,
  5150279519809946,
  41363,
  'Quicksand');
commit;

Output for this application would be:

{
 "TableName":"MYSCHEMA.POSAUTHORIZATIONS",
 "data":{"AUTH_AMOUNT":"2.2", "BUSINESS_NAME":"COMPANY 1", "ZIP":"41363", "EXP":"0916", 
"POS":"0", "CITY":"Quicksand", "CURRENCY_CODE":"USD", 
"PRIMARY_ACCOUNT":"6705362103919221351", 
"MERCHANT_ID":"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "TERMINAL_ID":"5150279519809946",
"CODE":"20130309113025"}
}

IS_PRESENT() function

IS_PRESENT (<stream name>, [ before | data ], <field number>)

For each event, returns true or false depending on whether the before or data array has a value for the specified field. For example, if you performed the following update on an Oracle table:

UPDATE POSAUTHORIZATIONS SET BUSINESS_NAME = 'COMPANY 5A' where pos=0;

The WAEvent for that update would look something like this:

data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu","6705362103919221351",
 "0","20130309113025","0916","USD","2.2","5150279519809946","41363","Quicksand"]
before: ["COMPANY 1",null,null,null,null,null,null,null,null,null,null]

You could use the following code to return values for the updated data fields and NOT_UPDATED for the other fields:

SELECT
  CASE WHEN IS_PRESENT(OracleCDCStream,before,0)==true THEN data[0].toString()
    ELSE "NOT_UPDATED"
  END,
  CASE WHEN IS_PRESENT(OracleCDCStream,before,1)==true THEN data[1].toString()
    ELSE "NOT_UPDATED"
  END ...

META() function

META(stream_name, metadata_key)

For each event, returns the value for the specified metadata key. Metadata keys are specific to each adapter.

For example, META(OracleStream, TableName) would return the table name of the relevant Oracle database.