Skip to main content

PostgreSQL

Striim supports PostgreSQL 9.4.x and later versions, Amazon RDS for PostgreSQL, Amazon Aurora with PostgreSQL compatibility, Azure Database for PostgreSQL, Azure Database for PostgreSQL - Flexible Server, Google AlloyDB for PostgreSQL, and Google Cloud SQL for PostgreSQL.

PostgreSQL Reader uses the wal2json plugin to read PostgreSQL change data. 1.x releases of wal2jon can not read transactions larger than 1 GB. If you have transactions that large, we recommend using a 2.x release of wal2json, which does not have that limitation. When you use a 2.x release, we recommend setting the Postgres Config property to automatically use format 2 for transactions larger than 1GB and format 1 for other transactions (see PostgreSQL Reader properties).

Striim provides templates for creating applications that read from PostgreSQL and write to various targets. See Creating an application using a template for details.

PostgreSQL setup

Striim reads change data from PostgreSQL.

Note

PostgreSQL Reader requires logical replication. For general information about PostgreSQL logical replication, see https://www.postgresql.org/docs/current/logical-replication.html and select your PostgreSQL version.

Before Striim applications can use the PostgreSQL Reader adapter, a PostgreSQL administrator with the necessary privileges must set up your database as described for your platform.

PostgreSQL setup in Amazon Aurora with PostgreSQL compatibility

You must set up replication at the cluster level. This will require a reboot, so it should probably be performed during a maintenance window.

Amazon Aurora supports logical replication for PostgreSQL compatibility options 10.6 and later. Automated backups must be enabled. To set up logical replication, your AWS user account must have the rds_superuser role.

For additional information, see Using PostgreSQL logical replication with Aurora, Replication with Amazon Aurora PostgreSQL, and Using logical replication to replicate managed Amazon RDS for PostgreSQL and Amazon Aurora to self-managed PostgreSQL.

  1. Go to your RDS dashboard, select Parameter groups > Create parameter group.

  2. For the Parameter group family, select the aurora-postgresql item that matches your PostgreSQL compatibility option (for example, for PostgreSQL 11, select aurora-postgresql11).

  3. For Type, select DB Cluster Parameter Group.

  4. For Group Name and Description, enter aurora-logical-decoding, then click Create.

  5. Click aurora-logical-decoding.

  6. Enter logical_ in the Parameters field to filter the list, click Modify, set rds.logical_replication to 1, and click Continue > Apply changes.

  7. In the left column, click Databases, then click the name of your Aurora cluster, click Modify, scroll down to Database options (you may have to expand the Additional configuration section), change DB cluster parameter group to aurora-logical-decoding, then scroll down to the bottom and click Continue.

  8. Select Apply immediately > Modify DB instance. Wait for the cluster's status to change from Modifying to Available, then stop it, wait for the status to change from Stopping to Stopped, then start it.

  9. In PSQL, enter the following command to create the replication slot:

    SELECT pg_create_logical_replication_slot('striim_slot', 'wal2json');
  10. Create a role with the REPLICATION attribute for use by PostgreSQLReader and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and (if necessary) public with the name of your schema.

    CREATE ROLE striim WITH LOGIN PASSWORD '******';
    GRANT rds_replication TO striim;
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO striim;
    

PostgreSQL setup in Amazon RDS for PostgreSQL

You must set up replication in the master instance. This will require a reboot, so it should probably be performed during a maintenance window.

Amazon RDS supports logical replication only for PostgreSQL version 9.4.9, higher versions of 9.4, and versions 9.5.4 and higher. Thus PostgreSQLReader can not be used with PostgreSQL 9.4 - 9.4.8 or 9.5 - 9.5.3 on Amazon RDS.

For additional information, see Best practices for Amazon RDS PostgreSQL replication and Using logical replication to replicate managed Amazon RDS for PostgreSQL and Amazon Aurora to self-managed PostgreSQL.

  1. Go to your RDS dashboard, select Parameter groups > Create parameter group, enter posstgres-logical-decoding as the Group name and Description, then click Create.

    createParameterGroup.png
  2. Click postgres-logical-decoding.

    parameterGroups.png
  3. Enter logical_ in the Parameters field to filter the list, click Modify, set rds.logical_replication to 1, and click Continue > Apply changes.

    RDSPostgreSQLParameterGroup.png
  4. In the left column, click Databases, then click the name of your database, click Modify, scroll down to Database options (you may have to expand the Additional configuration section), change DB parameter group to postgres-logical-decoding, then scroll down to the bottom and click Continue.

  5. Select Apply immediately > Modify DB instance. Wait for the database's status to change from Modifying to Available, then reboot it and wait for the status to change from Rebooting to Available.

  6. In PSQL, enter the following command to create the replication slot:

    SELECT pg_create_logical_replication_slot('striim_slot', 'wal2json');
  7. Create a role with the REPLICATION attribute for use by PostgreSQLReader and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and (if necessary) public with the name of your schema.

    CREATE ROLE striim WITH LOGIN PASSWORD '******';
    GRANT rds_replication TO striim;
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO striim;
    

PostgreSQL setup in Azure

Azure Database for PostgreSQL - Hyperscale is not supported because it does not support logical replication.

  1. Set up logical decoding using wal2json:

  2. Specify PostgreSQL Reader's properties as follows:

PostgreSQL setup in Google Cloud SQL for PostgreSQL or Google AlloyDB for PostgreSQL

  1. Set up logical replication as described in Setting up logical replication and decoding.

  2. Specify PostgreSQL Reader's properties as follows:

PostgreSQL setup in Linux or Windows

This will require a reboot, so it should probably be performed during a maintenance window.

  1. Install the wal2json plugin for the operating system of your PostgreSQL host as described in https://github.com/eulerto/wal2json.

  2. Edit postgressql.conf, set the following options, and save the file. The values for max_replication_slots and max_wal_senders may be higher but there must be one of each available for each instance of PostgreSQL Reader. max_wal_senders cannot exceed the value of max_connections.

    wal_level = logical
    max_replication_slots = 1
    max_wal_senders = 1
  3. Edit pg_hba.conf and add the following records, replacing <IP address> with the Striim server's IP address. If you have a multi-node cluster, add a record for each server that will run PostgreSQLReader. Then save the file and restart PostgreSQL.

    local  replication  striim  <IP address>/0  trust
    local  replication  striim                  trust
  4. Restart PostgreSQL.

  5. Enter the following command to create the replication slot (the location of the command may vary but typically is /usr/local/bin in Linux or C:\Program Files\PostgreSQL\<version>\bin\ in Windows.

    pg_recvlogical -d mydb --slot striim_slot --create-slot -P wal2json

    If you plan to use multiple instances of PostgreSQL Reader, create a separate slot for each.

  6. Create a role with the REPLICATION attribute for use by Striim and give it select permission on the schema(s) containing the tables to be read. Replace ****** with a strong password and myschema with the name of your schema.

    CREATE ROLE striim WITH LOGIN PASSWORD '******' REPLICATION;
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO striim;
    

PostgreSQL setup for schema evolution

Using Schema evolution with PostgreSQL Reader requires a tracking table in the source database. To create this table, run pg_ddl_setup.sql, which you can find in Striim/conf/DDLCaptureScripts or download from https://github.com/striim/doc-downloads.

PostgreSQL Reader properties

Before you can use this adapter, PostgreSQL must be configured as described in PostgreSQL setup.

If this reader will be deployed to a Forwarding Agent, install the driver as described in Install the PostgreSQL JDBC driver.

Striim provides templates for creating applications that read from PostgreSQL and write to various targets. See Creating an application using a template for details.

property

type

default value

notes

Bidirectional Marker Table

String

When performing bidirectional replication, the fully qualified name of the marker table (see Bidirectional replication). This setting is case-sensitive.

CDDL Action

enum

Process

See Handling schema evolution.

CDDL Capture

Boolean

False

See Handling schema evolution.

CDDL Tracking Table

String

See PostgreSQL setup for schema evolution.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

jdbc:postgresql:// followed by the primary server's IP address or network name, a colon, the port number, and a slash followed by the database name. If the database name is omitted, the Username value is used as the database name.

When connecting through an SSH tunnel (see Using an SSH tunnel to connect to a source or target), specify the IP address of the tunnel.

PostgreSQL Reader cannot read from a replica (standby) server since the replication slot is in the primary server.

Excluded Tables

String

Data for any tables specified here will not be returned. For example, if Tables uses a wildcard, data from any tables specified here will be omitted. Multiple table names (separated by semicolons) and wildcards may be used exactly as for Tables.

Filter Transaction Boundaries

Boolean

True

With the default value of True, begin and commit transactions are filtered out. Set to False to include begin and commit transactions.

Password

encrypted password

the password specified for the username (see Encrypted passwords)

Postgres Config

String

{"ReplicationPluginConfig": {"Name": "WAL2JSON", "Format": "1"}}

Change 1 to 2 to use wal2json format 2 (see the wal2json readme for more information).

If you are running an older version of Amazon RDS for PostgreSQL that supports only version 1, you may contact AWS technical support to have the wal2json plugin updated.

Replication Slot Name

String

striim_slot

The name of the replication slot created as described in PostgreSQL setup. If you have multiple instances of PostgreSQLReader, each must have its own slot.

Start LSN

String

By default, only new transactions are read. Optionally, specify a log sequence number to start reading from that point.

If you are using schema evolution (see Handling schema evolution, set a Start LSN only if you are sure that there have been no DDL changes after that point.Handling schema evolution

Tables

String

The table(s) for which to return change data. Tables must have primary keys (required for logical replication).

Names are case-sensitive. Specify source table names as <schema>.<table>) (The database is specified in the connection URL.)

You may specify multiple tables as a list separated by semicolons or using the following wildcards in the schema and/or table names only (not in the database name):

  • %: any series of characters

  • _: any single character

For example, %.% would include all tables in all schemas in the database specified in the connection URL.

The % wildcard is allowed only at the end of the string. For example, mydb.prefix% is valid, but mydb.%suffix is not.

All tables specified must have primary keys. Tables without primary keys are not included in output.

If any specified tables are missing PostgresReader will issue a warning. If none of the specified tables exists, start will fail with a "found no tables" error.

If you have multiple instances of PostgreSQLReader, each should read a separate set of tables.

Username

String

the login name for the user created as described in PostgreSQL setup

PostgreSQL Reader WAEvent fields

The output data type for PostgreSQLReader is WAEvent. The elements are:

metadata: a map including:

  • LSN: log sequence number of the transaction's commit

  • NEXT_LSN: next log sequence number (used for reconnecting to the replication slot after a non-fatal network interruption)

  • OperationName: INSERT, UPDATE, or DELETE

    When schema evolution is enabled, OperationName for DDL events will be Alter, AlterColumns, Create, or Drop. This metadata is reserved for internal use by Striim and subject to change, so should not be used in CQs, open processors, or custom Java functions.

  • PK_UPDATE: included only when an UPDATE changes the primary key

  • Sequence: incremented for each operation within a transaction

  • TableName: the name of the table including its schema

  • Timestamp: timestamp from the replication subscription

  • TxnID: transaction identifier

To retrieve the values for these fields, use the META() function. See Parsing the fields of WAEvent for CDC readers.

data: an array of fields, numbered from 0, containing:

  • for an INSERT operation, the values that were inserted

  • for an UPDATE, the values after the operation was completed

  • for a DELETE, the value of the primary key and nulls for the other fields

To retrieve the values for these fields, use SELECT ... (DATA[]). See Parsing the fields of WAEvent for CDC readers.

before: for UPDATE operations, contains the primary key value from before the update. When an update changes the primary key value, you may retrieve the previous value using the BEFORE() function.

dataPresenceBitMap, beforePresenceBitMap, and typeUUID are reserved and should be ignored.

PostgreSQL Reader simple application

The following application will write change data for all tables in all schemas in database mydb to SysOut. Replace striim and ****** with the user name and password for the PostgreSQL account you created for use by PostgreSQLReader (see PostgreSQL setup) and mydb and %.% with the names of the database and tables to be read. If the replication slot name is not striim_slot, specify it using the ReplicationSlotName property.

CREATE APPLICATION PostgreSQLTest;

CREATE SOURCE PostgreSQLCDCIn USING PostgreSQLReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'jdbc:postgresql://192.0.2.10:5432/mydb',
  ReplicationSlotName: 'striim_slot',
  Tables:'%.%'
) 
OUTPUT TO PostgreSQLCDCStream;

CREATE TARGET PostgreSQLCDCOut
USING SysOut(name:PostgreSQLCDC)
INPUT FROM PostgreSQLCDCStream;

END APPLICATION PostgreSQLTest;

PostgreSQL Reader example output

PostgreSQLReader's output type is WAEvent. See WAEvent contents for change data and PostgreSQL Reader WAEvent fields for more information.

The following are examples of WAEvents emitted by PostgreSQLReader for various operation types. They all use the following table:

CREATE TABLE posauthorizations (
  business_name varchar(30),
  merchant_id character varying(35) PRIMARY KEY,
  primary_account bigint,
  pos bigint,
  code character varying(20),
  exp character(4),
  currency_code character(3),
  auth_amount numeric(10,3),
  terminal_id bigint,
  zip bigint,
  city character varying(20));
INSERT

If you performed the following INSERT on the table:

INSERT INTO posauthorizations VALUES(
  'COMPANY 1',
  'D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu',
  6705362103919221351,
  0,
  '20130309113025',
  '0916',
  'USD',
  2.20,
  5150279519809946,
  41363,
  'Quicksand');

The WAEvent for that INSERT would be similar to:

data: ["COMPANY 1","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",6705362103919221351,0,"20130309113025",
"0916","USD",2.200,5150279519809946,41363,"Quicksand"]
metadata: {"TableName":"public.posauthorizations","TxnID":556,"OperationName":"INSERT",
"LSN":"0/152CD58","NEXT_LSN":"0/152D1C8","Sequence":1,"Timestamp":"2019-01-11 16:29:54.628403-08"}
UPDATE

If you performed the following UPDATE on the table:

UPDATE posauthorizations SET BUSINESS_NAME = 'COMPANY 5A' where pos=0;

The WAEvent for that UPDATE would be similar to:

data: ["COMPANY 5A","D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",6705362103919221351,0,"20130309113025",
"0916","USD",2.200,5150279519809946,41363,"Quicksand"]
metadata: {"TableName":"public.posauthorizations","TxnID":557,"OperationName":"UPDATE",
"LSN":"0/152D2E0","NEXT_LSN":"0/152D6F8","Sequence":1,"Timestamp":"2019-01-11 16:31:54.271525-08"}
before: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]

When an UPDATE changes the primary key, you may retrieve the old primary key value from the before array.

DELETE

If you performed the following DELETE on the table:

DELETE from posauthorizations where pos=0;

The WAEvent for that DELETE would be similar to:

data: [null,"D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu",null,null,null,null,null,null,null,null,null]
metadata: {"TableName":"public.posauthorizations","TxnID":558,"OperationName":"DELETE",
"LSN":"0/152D730","NEXT_LSN":"0/152D7C8","Sequence":1,"Timestamp":"2019-01-11 16:33:09.065951-08"}

Only the primary key value is included.

PostgreSQL Reader data type support and correspondence

PostgreSQL type

Striim type

bigint

long

bigserial

long

bit

string

bit varying

string

boolean

short

bytea

string

character

string

character varying

string

cidr

string

circle

unsupported

composite type

string

date

DateTime

daterange

string

double precision

double

inet

string

integer

integer

int2

short

int4

integer

int4range

string

int8

long

int8range

string

integer

integer

interval

string

json

string

jsonb

string

line

unsupported

lseg

unsupported

macaddr

string

macaddr8

string

money

string

name (system identifier)

string

numeric

string (Infinity, -Infinity, and NaN values will be converted to null)

numrange

string

path

unsupported

pg_lan

string

point

unsupported

polygon

unsupported

real

float

smallint

short

smallserial

short

serial

integer

text

string

time

string

time with time zone

string

timestamp

datetime

tsrange

string

timestamp with time zone

datetime

tstzrange

string

tsquery

unsupported

tsvector

unsupported

txid_snapshot

string

uuid

string

xml

string