Creating an app using the Flow Designer
This tutorial uses the sample applications described in Running the CDC demo apps. The Docker PostgreSQL instance must be running. Kafka is not required.
On the Apps page, if ValidatePostgres is not running, deploy and start it.
Click Create App > Start from scratch, name the app
PG2File
, select your personal namespace or enter a name for a new namespace, and click Save.Click the Metadata Browser icon, search for ReadCDCFromPostgresDB (located in Source), and click Copy to App.
For Name , enter
ReadPostgreSQLCDC
; for Password, enterstriim
; for New Output, enterrawCDCstream
; and click Save. Except for the name and output stream, this new source will have the same properties as SamplesDB.ReadCDCfromPostgresDB.Click the stream icon, click +, and select Connect next Target component.
In the New Target dialog, set the properties as follows (leave other properties at their defaults), then click Save.
Name: WriteRawData
Adapter: FileWriter
File Name: RawData
Directory: MyDirectory (click Show Advanced Settings)
Rollover Policy: leave as is
Flush Policy: EventCount 1 (delete Interval)
Formatter: JSONFormatter
Deploy and start the application, then return to the View All Apps page and start SamplesDB.Execute250Inserts.
Open
Striim\MyDirectory\RawData.00
. This contains the raw WAEvent output of PostgreSQLReader formatted as JSON. It should look similar to the following:[ { "metadata":{"TableName":"public.customer","TxnID":"2175","OperationName":"INSERT","LSN":"0\/2F9803D8","NEXT_LSN":"0\/2F9942C8","Sequence":1,"Timestamp":"2022-02-25 14:51:31.919978-06"}, "data":{ "c_custkey":150001, "c_name":"Customer#150001", "c_address":"IVhzIApeRb ot,c,E", "c_nationkey":15, "c_phone":"25-989-741-2988", "c_acctbal":"711.56", "c_mktsegment":"BUILDING ", "c_comment":"to the even, regular platelets. regular, ironic epitaphs nag e" }, "before":null, "userdata":null }, ...
This includes the column names and values for the row inserted into the
public.customer
table by transaction ID (TxnID
).
Modifying an app using the Flow Designer
To parse this raw data, you must write a CQ as described in Parsing the data field of WAEvent. Reopen the PG2File app in Flow Designer, stop and undeploy it, click the stream icon, click +, and select Connect next CQ component.
Name the CQ
ParseData
, name the new output streamParsedDataStream
, and copy and paste the following into the Query field:SELECT META(rawCDCstream,"OperationName").toString() AS OpType, TO_INT(data[0]) AS CustomerKey, TO_STRING(data[1]) AS CustomerName, TO_STRING(DATA[2]) AS CustomerAddress, TO_INT(DATA[3]) AS NationKey, TO_STRING(DATA[4]) AS CustomerPhone, TO_DOUBLE(data[5]) AS CustomerAccountBalance, TO_STRING(DATA[6]) AS MarketSegment, TO_STRING(data[7]) AS CustomerComment FROM rawCDCstream;
Make sure the properties look as shown below, then click Save.
Click the ParseData stream icon, click +, and select Connect next Target component.
Set the properties as shown below and click Save.
Name: WriteParsedData
Adapter: FileWriter
File Name: ParsedData
Directory: MyDirectory (click Show Advanced Settings)
Rollover Policy: leave as is
Flush Policy: EventCount 1 (delete Interval)
Formatter: JSONFormatter
Deploy and run the application, then return to the Apps page and start SamplesDB.Execute200Inserts.
Open
Striim\MyDirectory\ParsedData.00
. This contains the parsed data in the custom format defined by the CQ as well as the operation type from the metadata."OpType":"INSERT", "CustomerKey":151501, "CustomerName":"Customer#151501", "CustomerAddress":"IVhzIApeRb ot,c,E", "NationKey":15, "CustomerPhone":"25-989-741-2988", "CustomerAccountBalance":711.56, "MarketSegment":"BUILDING ", "CustomerComment":"to the even, regular platelets. regular, ironic epitaphs nag e"