PostgreSQL
Striim supports PostgreSQL 9.4.x and later versions, Amazon RDS for PostgreSQL, Amazon Aurora with PostgreSQL compatibility, Azure Database for PostgreSQL, Azure Database for PostgreSQL - Flexible Server, Google AlloyDB for PostgreSQL, and Google Cloud SQL for PostgreSQL.
PostgreSQL Reader uses the wal2json plugin to read PostgreSQL change data. 1.x releases of wal2jon can not read transactions larger than 1 GB. If you have transactions that large, we recommend using a 2.x release of wal2json, which does not have that limitation. When you use a 2.x release, we recommend setting the Postgres Config property to automatically use format 2 for transactions larger than 1GB and format 1 for other transactions (see PostgreSQL Reader properties).
Striim provides templates for creating applications that read from PostgreSQL and write to various targets. See Creating an application using a template for details.
PostgreSQL setup
Striim reads change data from PostgreSQL.
Note
PostgreSQL Reader requires logical replication. For general information about PostgreSQL logical replication, see https://www.postgresql.org/docs/current/logical-replication.html and select your PostgreSQL version.
Before Striim applications can use the PostgreSQL Reader adapter, a PostgreSQL administrator with the necessary privileges must set up your database as described for your platform.
PostgreSQL setup in Amazon Aurora with PostgreSQL compatibility
You must set up replication at the cluster level. This will require a reboot, so it should probably be performed during a maintenance window.
Amazon Aurora supports logical replication for PostgreSQL compatibility options 10.6 and later. Automated backups must be enabled. To set up logical replication, your AWS user account must have the rds_superuser role.
For additional information, see Using PostgreSQL logical replication with Aurora, Replication with Amazon Aurora PostgreSQL, and Using logical replication to replicate managed Amazon RDS for PostgreSQL and Amazon Aurora to self-managed PostgreSQL.
Go to your RDS dashboard, select Parameter groups > Create parameter group.
For the Parameter group family, select the aurora-postgresql item that matches your PostgreSQL compatibility option (for example, for PostgreSQL 11, select aurora-postgresql11).
For Type, select DB Cluster Parameter Group.
For Group Name and Description, enter
aurora-logical-decoding
, then click Create.Click aurora-logical-decoding.
Enter
logical_
in the Parameters field to filter the list, click Modify, set rds.logical_replication to 1, and click Continue > Apply changes.In the left column, click Databases, then click the name of your Aurora cluster, click Modify, scroll down to Database options (you may have to expand the Additional configuration section), change DB cluster parameter group to aurora-logical-decoding, then scroll down to the bottom and click Continue.
Select Apply immediately > Modify DB instance. Wait for the cluster's status to change from Modifying to Available, then stop it, wait for the status to change from Stopping to Stopped, then start it.
In PSQL, enter the following command to create the replication slot:
SELECT pg_create_logical_replication_slot('striim_slot', 'wal2json');
Create a role with the REPLICATION attribute for use by PostgreSQLReader and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and (if necessary)
public
with the name of your schema.CREATE ROLE striim WITH LOGIN PASSWORD '******'; GRANT rds_replication TO striim; GRANT SELECT ON ALL TABLES IN SCHEMA public TO striim;
PostgreSQL setup in Amazon RDS for PostgreSQL
You must set up replication in the master instance. This will require a reboot, so it should probably be performed during a maintenance window.
Amazon RDS supports logical replication only for PostgreSQL version 9.4.9, higher versions of 9.4, and versions 9.5.4 and higher. Thus PostgreSQLReader can not be used with PostgreSQL 9.4 - 9.4.8 or 9.5 - 9.5.3 on Amazon RDS.
For additional information, see Best practices for Amazon RDS PostgreSQL replication and Using logical replication to replicate managed Amazon RDS for PostgreSQL and Amazon Aurora to self-managed PostgreSQL.
Go to your RDS dashboard, select Parameter groups > Create parameter group, enter
posstgres-logical-decoding
as the Group name and Description, then click Create.Click postgres-logical-decoding.
Enter
logical_
in the Parameters field to filter the list, click Modify, set rds.logical_replication to 1, and click Continue > Apply changes.In the left column, click Databases, then click the name of your database, click Modify, scroll down to Database options (you may have to expand the Additional configuration section), change DB parameter group to postgres-logical-decoding, then scroll down to the bottom and click Continue.
Select Apply immediately > Modify DB instance. Wait for the database's status to change from Modifying to Available, then reboot it and wait for the status to change from Rebooting to Available.
In PSQL, enter the following command to create the replication slot:
SELECT pg_create_logical_replication_slot('striim_slot', 'wal2json');
Create a role with the REPLICATION attribute for use by PostgreSQLReader and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and (if necessary)
public
with the name of your schema.CREATE ROLE striim WITH LOGIN PASSWORD '******'; GRANT rds_replication TO striim; GRANT SELECT ON ALL TABLES IN SCHEMA public TO striim;
PostgreSQL setup in Azure
Azure Database for PostgreSQL - Hyperscale is not supported because it does not support logical replication.
Set up logical decoding using wal2json:
for Azure Database for PostgreSQL, see Logical decoding
for Azure Database for PostgreSQL Flexible Server, see Logical replication and logical decoding in Azure Database for PostgreSQL - Flexible Server
Specify PostgreSQL Reader's properties as follows:
Postgres Config: if using wal2json version 2, specify that as described in PostgreSQL Reader properties
Replication slot name: see Logical decoding
Username: see Quickstart: Create an Azure Database for PostgreSQL server by using the Azure portal
Password: the login password for that user
PostgreSQL setup in Google Cloud SQL for PostgreSQL or Google AlloyDB for PostgreSQL
Set up logical replication as described in Setting up logical replication and decoding.
Specify PostgreSQL Reader's properties as follows:
Postgres Config: do not change default, Google Cloud SQL does not support wal2json version 2
Replication slot name: the name of the slot created in the "Create replication slot" section of Receiving decoded WAL changes for change data capture (CDC)
Username: the name of the user created in Create a replication use
Password: the login password for that user
PostgreSQL setup in Linux or Windows
This will require a reboot, so it should probably be performed during a maintenance window.
Install the wal2json plugin for the operating system of your PostgreSQL host as described in https://github.com/eulerto/wal2json.
Edit
postgressql.conf
, set the following options, and save the file. The values for max_replication_slots and max_wal_senders may be higher but there must be one of each available for each instance of PostgreSQL Reader. max_wal_senders cannot exceed the value of max_connections.wal_level = logical max_replication_slots = 1 max_wal_senders = 1
Edit
pg_hba.conf
and add the following records, replacing<IP address>
with the Striim server's IP address. If you have a multi-node cluster, add a record for each server that will run PostgreSQLReader. Then save the file and restart PostgreSQL.local replication striim <IP address>/0 trust local replication striim trust
Restart PostgreSQL.
Enter the following command to create the replication slot (the location of the command may vary but typically is
/usr/local/bin
in Linux orC:\Program Files\PostgreSQL\<version>\bin\
in Windows.pg_recvlogical -d mydb --slot striim_slot --create-slot -P wal2json
If you plan to use multiple instances of PostgreSQL Reader, create a separate slot for each.
Create a role with the REPLICATION attribute for use by Striim and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and
myschema
with the name of your schema.CREATE ROLE striim WITH LOGIN PASSWORD '******' REPLICATION; GRANT SELECT ON ALL TABLES IN SCHEMA public TO striim;
PostgreSQL setup for schema evolution
Using Schema evolution with PostgreSQL Reader requires a tracking table in the source database. To create this table, run pg_ddl_setup.sql
, which you can find in Striim/conf/DDLCaptureScripts
or download from https://github.com/striim/doc-downloads.
PostgreSQL Reader properties
Before you can use this adapter, PostgreSQL must be configured as described in PostgreSQL setup.
If this reader will be deployed to a Forwarding Agent, install the driver as described in Install the PostgreSQL JDBC driver.
Striim provides templates for creating applications that read from PostgreSQL and write to various targets. See Creating an application using a template for details.
property | type | default value | notes |
---|---|---|---|
Bidirectional Marker Table | String | When performing bidirectional replication, the fully qualified name of the marker table (see Bidirectional replication). This setting is case-sensitive. | |
CDDL Action | enum | Process | |
CDDL Capture | Boolean | False | |
CDDL Tracking Table | String | ||
Connection Retry Policy | String | retryInterval=30, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connection URL | String |
PostgreSQL Reader cannot read from a replica (standby) server since the replication slot is in the primary server. | |
Excluded Tables | String | If | |
Filter Transaction Boundaries | Boolean | True | With the default value of True, begin and commit transactions are filtered out. Set to False to include begin and commit transactions. |
Password | encrypted password | the password specified for the username (see Encrypted passwords) | |
Postgres Config | String | {"ReplicationPluginConfig": {"Name": "WAL2JSON", "Format": "1"}} | Change If you are running an older version of Amazon RDS for PostgreSQL that supports only version 1, you may contact AWS technical support to have the wal2json plugin updated. |
Replication Slot Name | String | striim_slot | The name of the replication slot created as described in PostgreSQL setup. If you have multiple instances of PostgreSQLReader, each must have its own slot. |
Start LSN | String | By default, only new transactions are read. Optionally, specify a log sequence number to start reading from that point. If you are using schema evolution (see Handling schema evolution, set a Start LSN only if you are sure that there have been no DDL changes after that point. | |
Tables | String | The table(s) for which to return change data. Tables must have primary keys (required for logical replication). Names are case-sensitive. Specify source table names as You may specify multiple tables as a list separated by semicolons or using the following wildcards in the schema and/or table names only (not in the database name):
For example, The All tables specified must have primary keys. Tables without primary keys are not included in output. If any specified tables are missing PostgresReader will issue a warning. If none of the specified tables exists, start will fail with a "found no tables" error. If you have multiple instances of PostgreSQLReader, each should read a separate set of tables. | |
Username | String | the login name for the user created as described in PostgreSQL setup |
PostgreSQL Reader WAEvent fields
The output data type for PostgreSQLReader is WAEvent. The elements are:
metadata: a map including:
LSN: log sequence number of the transaction's commit
NEXT_LSN: next log sequence number (used for reconnecting to the replication slot after a non-fatal network interruption)
OperationName: INSERT, UPDATE, or DELETE
When schema evolution is enabled, OperationName for DDL events will be Alter, AlterColumns, Create, or Drop. This metadata is reserved for internal use by Striim and subject to change, so should not be used in CQs, open processors, or custom Java functions.
PK_UPDATE: included only when an UPDATE changes the primary key
Sequence: incremented for each operation within a transaction
TableName: the name of the table including its schema
TimeStamp: timestamp from the replication subscription
TxnID: transaction identifier
To retrieve the values for these fields, use the META()
function. See Parsing the fields of WAEvent for CDC readers.
data: an array of fields, numbered from 0, containing:
for an INSERT operation, the values that were inserted
for an UPDATE, the values after the operation was completed
for a DELETE, the value of the primary key and nulls for the other fields
To retrieve the values for these fields, use SELECT ... (DATA[])
. See Parsing the fields of WAEvent for CDC readers.
before: for UPDATE operations, contains the primary key value from before the update. When an update changes the primary key value, you may retrieve the previous value using the BEFORE()
function.
dataPresenceBitMap, beforePresenceBitMap, and typeUUID are reserved and should be ignored.
PostgreSQL Reader simple application
The following application will write change data for all tables in all schemas in database mydb to SysOut. Replace striim
and ******
with the user name and password for the PostgreSQL account you created for use by PostgreSQLReader (see PostgreSQL setup) and mydb
and %.%
with the names of the database and tables to be read. If the replication slot name is not striim_slot
, specify it using the ReplicationSlotName property.
CREATE APPLICATION PostgreSQLTest; CREATE SOURCE PostgreSQLCDCIn USING PostgreSQLReader ( Username:'striim', Password:'******', ConnectionURL:'jdbc:postgresql://192.0.2.10:5432/mydb', ReplicationSlotName: 'striim_slot', Tables:'%.%' ) OUTPUT TO PostgreSQLCDCStream; CREATE TARGET PostgreSQLCDCOut USING SysOut(name:PostgreSQLCDC) INPUT FROM PostgreSQLCDCStream; END APPLICATION PostgreSQLTest;
PostgreSQL Reader example output
PostgreSQLReader's output type is WAEvent. See WAEvent contents for change data and PostgreSQL Reader WAEvent fields for more information.
The following are examples of WAEvents emitted by PostgreSQLReader for various operation types. They all use the following table:
CREATE TABLE posauthorizations ( business_name varchar(30), merchant_id character varying(35) PRIMARY KEY, primary_account bigint, pos bigint, code character varying(20), exp character(4), currency_code character(3), auth_amount numeric(10,3), terminal_id bigint, zip bigint, city character varying(20));
INSERT
If you performed the following INSERT on the table:
INSERT INTO posauthorizations VALUES( 'COMPANY 1', 'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu', 6705362103919221351, 0, '20130309113025', '0916', 'USD', 2.20, 5150279519809946, 41363, 'Quicksand');
The WAEvent for that INSERT would be similar to:
data: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",6705362103919221351,0,"20130309113025", "0916","USD",2.200,5150279519809946,41363,"Quicksand"] metadata: {"TableName":"public.posauthorizations","TxnID":556,"OperationName":"INSERT", "LSN":"0/152CD58","NEXT_LSN":"0/152D1C8","Sequence":1,"Timestamp":"2019-01-11 16:29:54.628403-08"}
UPDATE
If you performed the following UPDATE on the table:
UPDATE posauthorizations SET BUSINESS_NAME = 'COMPANY 5A' where pos=0;
The WAEvent for that UPDATE would be similar to:
data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",6705362103919221351,0,"20130309113025", "0916","USD",2.200,5150279519809946,41363,"Quicksand"] metadata: {"TableName":"public.posauthorizations","TxnID":557,"OperationName":"UPDATE", "LSN":"0/152D2E0","NEXT_LSN":"0/152D6F8","Sequence":1,"Timestamp":"2019-01-11 16:31:54.271525-08"} before: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]
When an UPDATE changes the primary key, you may retrieve the old primary key value from the before array.
DELETE
If you performed the following DELETE on the table:
DELETE from posauthorizations where pos=0;
The WAEvent for that DELETE would be similar to:
data: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null] metadata: {"TableName":"public.posauthorizations","TxnID":558,"OperationName":"DELETE", "LSN":"0/152D730","NEXT_LSN":"0/152D7C8","Sequence":1,"Timestamp":"2019-01-11 16:33:09.065951-08"}
Only the primary key value is included.
PostgreSQL Reader data type support and correspondence
PostgreSQL type | Striim type |
---|---|
bigint | long |
bigserial | long |
bit | string |
bit varying | string |
boolean | short |
bytea | string |
character | string |
character varying | string |
cidr | string |
circle | unsupported |
composite type | string |
date | DateTime |
daterange | string |
double precision | double |
inet | string |
integer | integer |
int2 | short |
int4 | integer |
int4range | string |
int8 | long |
int8range | string |
integer | integer |
interval | string |
json | string |
jsonb | string |
line | unsupported |
lseg | unsupported |
macaddr | string |
macaddr8 | string |
money | string |
name (system identifier) | string |
numeric | string ( |
numrange | string |
path | unsupported |
pg_lan | string |
point | unsupported |
polygon | unsupported |
real | float |
smallint | short |
smallserial | short |
serial | integer |
text | string |
time | string |
time with time zone | string |
timestamp | datetime |
tsrange | string |
timestamp with time zone | datetime |
tstzrange | string |
tsquery | unsupported |
tsvector | unsupported |
txid_snapshot | string |
uuid | string |
xml | string |