Skip to main content

CREATE EXTERNAL SOURCE

Kafka Connect is a component of Apache Kafka that serves as a centralized hub for data integration between databases, key-value stores, search indexes and file systems. The Connect framework consists of source connector components to ingest and stream data from external sources into Kafka topics and sink connector components to deliver data from the Kafka topics to the external targets.

Striim has a persisted stream feature that allows data captured from external sources by reader adapters to be stored in Kafka topics by persisted stream senders. Data from these Kafka topics can then be retrieved by persisted stream pullers and delivered to external targets using writer adapters.

Striim Connect is a new solution from Striim that uses the first half of the Kafka Connect pipeline and combines it with the second half of the Striim persisted stream pipeline. This allows external sources, for which Striim does not have a reader adapter, to develop a Kafka source connector and be able to integrate with the Striim data pipeline.

Features of Striim Connect

The following are features of Striim Connect:

  • Infer/create Striim types for incoming table data from Debezium SourceRecords or Avro data/schema definitions.

  • Support Initial Loads along with CDC events.

  • Publish appropriate monitoring metrics in Striim’s monitoring app for the source connectors.

    Note

    Striim Connect is not a source adapter, but a persistent stream that reads data from Kafka Connect topics originating from various sources such as databases, key-value stores, and document stores. Be aware that when reviewing metrics, you are not using a specific source adapter, but a generic data integration technology. For example, when reading from a source database, you are not directly reading from the database, but have configured Kafka Connect for the database in your environment. Therefore, when reviewing metrics, consider Kafka Connect and what it provides to Striim.

    striim-connect-diagram.png

Support for reading data from Debezium or Debezium-compliant source connectors

Striim Connect can read data change event records containing row-level changes in databases from Kafka topics written by any Kafka source connector through Persisted Streams. Striim has certified support for the Debezium-compliant YugabyteDB connector.

The following are requirements for reading the data:

  • The records are serialized in Avro format and the Confluent Schema Registry.

  • The schema of the Avro records conforms to that of the Debezium connectors, where the schema name for the change event value record is Envelope, and the payload for the Envelope records contains:

    • A source field containing a schema and a table field, a combination of which provides a qualified table name in the database

    • An op field with c (insert,) u (update), d (delete), (initial load) or t (truncate, currently ignored) as possible values

    • A before field containing the full before image of data when op is u or d

    • An after field containing the full after image of data when op is c, u or r

Configuring the Kafka source connector

The following configurations are required for the Kafka source connector.

Note

DDL propagation is not supported in Striim.

Converter configuration

Both the key and the value for the Kafka messages written to the Kafka topics by the source connectors should use the Confluent Avro Converter with a Schema Registry for serialization. The subject naming strategy in the Schema Registry should be defined as TopicRecordNameStrategy as the Persisted Stream Kafka topic will receive data from multiple tables. The connector should define the following in its configuration:

"key.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "<schema_registry_url>",
"value.converter.schema.registry.url" : "<schema_registry_url>",
"key.converter.key.subject.name.strategy" : "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"

Initial load configuration

Use the following configuration in the source connector configuration to extract the existing data from the interested tables/collections in the source system when the connector starts for the first time, before it starts capturing the CDC data:

"snapshot.mode" : "initial"

Topic configuration

By default a Kafka source connector automatically creates one Kafka topic for each table or collection it is reading from the source. The created topics have names following the pattern server.schema.table. But in order to be integrated with Striim’s persisted streams, the source connector needs to write data from all the tables in the Kafka topic created by the persisted stream. To achieve this the source connector configuration needs to define a topic rerouting transformation:

"transforms" : "Reroute",
"transforms.Reroute.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex" : ".*",
"transforms.Reroute.replacement" : "<persisted_stream_topic_name>"

E1P configuration

Kafka Connect provides the E1P feature stating version 3.3. This feature is available when using the distributed mode of connect, not in standalone mode. By default the feature is not enabled, it needs to be enabled in the distributed connect configuration using this property:

exactly.once.source.support=enabled

Not all source connectors support E1P even if the Kafka Connect framework is capable and configured for it. In order to get E1P from the source connector use the configuration in the source connector configuration:

"exactly.once.support" : "requested"

If the source connector supports E1P it will provide it, otherwise it will ignore the configuration. If it is required that the source connector should run only if it can guarantee E1P, use the configuration:

"exactly.once.support" : "required"

Datatype conversion

For proper datatype mapping, the default values for the following connector configurations should remain unchanged:

"interval.handling.mode" : "numeric",
"time.precision.mode" : "adaptive",
"decimal.handling.mode" : "double", 
"binary.handling.mode" : "bytes"

Configuring Striim

The following sections describe the configuration for Striim.

Configuring a Kafka PropertySet

When creating a Kafka PropertySet for persisted streams that will be receiving data from Kafka source connectors the following changes are required:

  • You must define the dataformat property with the value:

    • avro-debezium to read data from Debezium connectors or

    • avro-striim to read data from third party source connectors publishing data using Striim prescribed format.

  • Must define the new property schemaregistry with the URL of the Schema Registry for the Avro data being published in the Kafka topics used by the persisted stream.

Configuring the persisted stream

Note the following requirements for the persisted stream:

  • The persisted streams that will receive data from Kafka source connectors must be created before starting the connectors. You must provide the name of the Kafka topic created by the persisted stream in the corresponding source connector configuration as its target.

  • No Striim component can write to persisted streams with the data format avro-debezium or avro-striim. Striim components are only allowed to read from them, while Kafka source connectors will write to them.

  • The type of such persisted streams must be Global.WAEvent. They cannot be of any other event type or a user-defined type.

Configuring target mapping in the Striim app

For the source database, the ordering of columns in the WAEvents coming out of the persisted stream will only match the ordering of columns in the source table if the primary key columns are defined in the table in order in the beginning of the column list. Otherwise, in the WAEvents, the primary key will appear first if it's a single key. If it's a composite key, there is no guarantee on the ordering of those columns. Also, there is no guarantee on ordering of the remaining non-PK columns.

For these reasons, rather than assuming the source table column ordering, you should use an explicit column mapping by name in the targets.

Database mapping

Any source database data will be written to the Kafka topic as one the Avro datatypes. This may cause loss of type information as Avro types are limited compared to the source databases. To mitigate this issue, Debezium provides an optional Semantic Type with every Literal Type (Avro schema type) definition with the data records. The source connectors must provide the semantic types for data when required for correct datatype handling by Striim Connect.

If a semantic type is present, the following is the datatype mapping from Avro data to Striim events:

Semantic type

Description

Literal type

Java type

io.debezium.time.Date

# of days since the epoch

int

java.time.LocalDate

io.debezium.time.Time

# of milliseconds past midnight

int

java.time.LocalTime

io.debezium.time.MicroTime

# of microseconds past midnight

long

java.time.LocalTime

io.debezium.time.NanoTime

# of nanoseconds past midnight

long

java.time.LocalTime

io.debezium.time.Timestamp

# of milliseconds past the epoch

long

java.time.LocalDateTime

io.debezium.time.MicroTimestamp

# of microseconds past the epoch

long

java.time.LocalDateTime

io.debezium.time.NanoTimestamp

# of nanoseconds past the epoch

long

java.time.LocalDateTime

io.debezium.time.ZonedTimestamp

timestamp in ISO 8601 format

String

java.time.LocalDateTime

org.apache.kafka.connect.data.Date

# of days since the epoch

int

java.time.LocalDate

org.apache.kafka.connect.data.Time

# of microseconds past midnight

long

java.time.LocalTime

org.apache.kafka.connect.data.Timestamp

# of milliseconds since epoch

long

java.time.LocalDateTime

If there is no semantic type, the following is the datatype mapping from Avro data to Striim events:

Literal type

Java type

Notes

boolean

java.lang.Boolean

int

java.lang.Integer

long

java.lang.Long

float

java.lang.Float

double

java.lang.Double

string

java.lang.String

enum

java.lang.String

fixed

java.lang.String

bytes

java.lang.String

Hex encoded

array

java.lang.String

Comma separated, curly bracket enclosed

Creating an EXTERNAL SOURCE using TQL

A new Striim EntityType named External Source allows you to indicate the original source the Kafka source connector is reading from. It only exists as metadata and does not get deployed as part of a Striim application. This entity has an optional set of properties to describe it, such as the connector name and the connector’s configuration file location. These properties are only for descriptive purposes. Whereas the stream entity an external source writes to is part of striim and is deployed with it’s application. The stream must be a persisted stream of type Global.WAEvent with a proper value for dataformat and schemaregistry defined in its property set.

W (admin) > create propertySet KConnPropSet (
    zk.address: '10.0.0.92:2181',
    bootstrap.brokers: '10.0.0.92:9092',
    kafkaversion: '2.1',
    partitions: '1',
    replication.factor: '1',
    dataformat: 'avro-debezium',
    schemaregistry: 'http://10.0.0.92:8085'
);
W (admin) > create stream example_data of Global.WAEvent 
   persist using KConnPropSet;
W (admin) > create external source example_source (
    connector: 'exampleDB',
    configFile: './config/source-connector.json'
) output to example_data;

Or create the stream implicitly along with the external source:

W (admin) > create propertySet KConnPropSet (
    zk.address: '10.0.0.92:2181',
    bootstrap.brokers: '10.0.0.92:9092',
    kafkaversion: '2.1',
    partitions: '1',
    replication.factor: '1',
    dataformat: 'avro-debezium',
    schemaregistry: 'http://10.0.0.92:8085'
);
W (admin) > create external source example_source (
    connector: 'exampleDB',
    configFile: './config/source-connector.json'
) output to example_data persist using KConnPropSet;

You can use the list, describe and drop TQL commands on this new EntityType:

W (admin) > list external sources;
EXTERNALSOURCE 1 =>  admin.example_source
W (admin) > describe external source example_source;
EXTERNALSOURCE admin.example_source CREATED 2024-02-27 21:05:49
 WITH PROPERTIES (
   configFile: ./config/source-connector.json,
   connector: exampleDB
)
OUTPUTS TO STREAM example_data
W (admin) > drop external source ext_source;
EXTERNALSOURCE example_source dropped successfully

Setting up the environment for Striim Connect

The following steps show an example of setting up the environment for Striim Connect. These steps may vary depending on your source and target.

  1. Setup Kafka Broker. Download Kafka 3.3.2 (or above). Start Zookeeper and a Kafka broker.

  2. Setup Confluent Schema Registry. For example:

    docker run -it --rm --name csr -p 8085:8085 
      -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=<kafka_broker_ip>:9092 
      -e SCHEMA_REGISTRY_HOST_NAME=localhost 
      -e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8085 
      -e SCHEMA_REGISTRY_KAFKASTORE_TIMEOUT_MS=5000 confluentinc/cp-schema-registry:latest
  3. Setup Kafka Connect service with the connector plugin. For example:

    docker run -it --rm --name connect -p 8083:8083 
      -e BOOTSTRAP_SERVERS=<kafka_broker_ip>:9092 
      -e GROUP_ID=connect_cluster 
      -e CONFIG_STORAGE_TOPIC=connect_configs 
      -e OFFSET_STORAGE_TOPIC=connect_offsets 
      -e STATUS_STORAGE_TOPIC=connect_status quay.io/yugabyte/debezium-connector:latest
  4. Set up the database instance. For example:

    docker run -d -p 7000:7000 -p 7100:7100 -p 9000:9000 -p 9100:9100 -p 15433:15433 
      -p 5433:5433 -p 9042:9042 --name yugabyte yugabytedb/yugabyte:latest bin/yugabyted
      start --daemon=false
  5. In docker perform the following steps:

    1. Check the host and port of the instance.

    2. Connect to Yugabyte SQL (YSQL), create a few tables as the data sources, and insert data in the tables.

    3. Create a change data stream.

    docker exec -it  yugabyte /bin/bash
    (a) yugabyted status
    (b) bin/ysqlsh -h <docker_host_ip>  -U yugabyte -d yugabyte
    (c) bin/yb-admin --master_addresses <docker_host_ip>:7100 
      create_change_data_stream ysql.yugabyte IMPLICIT ALL

    Note the CDC Stream ID, it will be required in the connector configuration.

  6. Setup PostgreSQL as the target database. Create the target tables in the database.

    docker run -d -p 5432:5432 --name postgres12 
      123456789012.dkr.ecr.us-west-1.amazonaws.com/postgres-12.2:latest
  7. Start the Striim server. The following sample TQL creates an external stream and write to the PostgreSQL target:

    create application demo recovery 30 second interval;
    create PropertySet KConnPropSet (
        zk.address: '<ip-address>:<port>',
        bootstrap.brokers: '<ip-address>:<port>',
        kafkaversion: '2.1',
        partitions: '1',
        replication.factor: '1',
        dataformat: 'avro-debezium',
        schemaregistry: 'http://<ip-address>:<port>'
    );
    create stream yb_data of Global.WAEvent persist using KConnPropSet;
    create external source yb_source (
        connector: 'yugabyteDB',
        configFile: './config/debezium-yugabyte-source-connector.json'
    ) output to yb_data;
    create target pg_replica using Global.DatabaseWriter (
        ConnectionURL:'jdbc:postgresql://<postgresql_ip>:<port/example>',
        ConnectionRetryPolicy: 'retryInterval=5, maxRetries=3',
        CDDLAction: 'Process',
        Username:'<example>',
        Password:'<example>',
        BatchPolicy:'Eventcount:10,Interval:10',
        CommitPolicy:'Interval:10,Eventcount:10',
        Tables: '<list_of_table_mappings>'
    ) input from yb_data;
    create target con_disp using SysOut(name:Disp) input from yb_data;
    end application demo;
    deploy application demo;
    start demo;
  8. Start the source connector.

    curl -H "Content-Type: application/json" 
      -X POST http://<kafka_connect_ip>:8083/connectors 
      -d @"debezium-yugabyte-source-connector.json"

    Configure the source connector as follows:

    {
        "name" : "debezium-yugabyte-source-connector",
        "config": {
    		"connector.class" : "io.debezium.connector.yugabytedb.YugabyteDBConnector",
    		"tasks.max" : 1,
    		"snapshot.mode" : "initial",
    		"exactly.once.support" : "requested",
    		"database.hostname" : "<yugabytedb_ip>",
    		"database.port" : 5433,
    		"database.master.addresses" : "<yugabytedb_ip>:7100",
    		"database.user" : "yugabyte",
    		"database.password" : "yugabyte",
    		"database.dbname" : "yugabyte",
    		"database.server.name" : "dbserver1",
    		"database.streamid" : "<cdc_stream_id>",
    		"table.include.list" : "<list_of_tables>",
    		"tombstones.on.delete" : false,
    		"topic.creation.enable" : true,
    		"topic.prefix" : "debezium",
    		"topic.creation.default.partitions" : 1,
    		"topic.creation.default.replication.factor" : 1,
    		"include.schema.changes" : true,
    		"schema.history.internal.kafka.bootstrap.servers" : "<kafka_broker_ip>:9092", 
    		"schema.history.internal.kafka.topic" : "debezium_schemahistory",
    		"key.converter" : "io.confluent.connect.avro.AvroConverter",
    		"value.converter" : "io.confluent.connect.avro.AvroConverter",
    		"key.converter.schema.registry.url" : "http://<schema_registry_ip>:8085",
    		"value.converter.schema.registry.url" : "http://<schema_registry_ip>:8085",
    		"key.converter.key.subject.name.strategy" :
                      "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    		"value.converter.value.subject.name.strategy":
                      "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    		"transforms" : "Reroute",
    		"transforms.Reroute.type" : "org.apache.kafka.connect.transforms.RegexRouter",
    		"transforms.Reroute.regex" : ".*",
    		"transforms.Reroute.replacement" : "admin_yb_data"
        }
    }
  9. The existing records from the source database tables will appear in the PostgreSQL tables. Do some DMLs in the source database tables, and verify that they appear in the PostgreSQL tables.

    Note

    Another option for testing Striim Connect is to configure and test the Kafka Connect components separately from the Striim components before combining them together. First set up the infrastructure and test data flow from the source database to the Kafka topic. Then test the actual Striim Connect functionality and verify that Striim is able to read from the Kafka topic and deliver to the targets.