Real-Time Database Integration with Apache Kafka via Change Data Capture
In this series of blog-based tutorials, we are going to step you through the process of building data flows for streaming integration and analytics applications using the Striim platform. This blog series focuses on the continuous collection of streaming data from a database; integration with Apache Kafka; Kafka stream processing, including enrichment, using continual SQL-based queries; delivery of data to multiple targets; Kafka data analytics and visualization of results.
The first step in any streaming integration is sourcing data. You are probably aware that Striim can continuously collect data from many sources. In this tutorial we are going to be using change data capture (CDC) to stream database DML activity (inserts, updates and deletes) from a MySQL database. The same process would be used to read from Oracle, SQL Server, and MariaDB, but MySQL was chosen so you can easily try this at home.
To start building the CDC application, you need to navigate to Apps in either the drop-down menu, or home screen. Once there, click on the Apps link to view all previously created data flow applications. This may be empty if you are first getting started with Striim.
To create the application click “Add App” in the top right-hand corner. This will give you three options for creating a new application:
- Start with Template
- Start from Scratch
- Import Existing App
In this case we are going to start with a template. There are a lot of templates for many combinations of sources and targets. You can narrow the search by entering terms into the Search Templates box.
Since we are sourcing from MySQL, you can narrow the search using MySQL.
Our first dataflow will write into Kafka, so select the appropriate Kafka version as a target. You are now able to name your application. Within Striim applications have fully qualified names that enable the same name (for different departments, or users) to exist in different namespaces. A namespace is really just a way to keep related things together.
This application is MySQLCDCToKafka, and we’ll put it in the ‘cdc’ namespace.
When you click “Save,” the application is created and the wizard will walk you through the steps of the process:
- 1Enter the Data Source info
- 2Test the connection and CDC capabilities
- 3Select the required tables. (Do optional mapping and filtering of data)
- 4Enter the Target info
You can move forwards and backwards through these steps as necessary if you need to make any corrections.
The first step is to enter the required information to connect to the data source. This will vary by source, but you will always need to choose a unique (within the namespace) data source name. In the case of MySQL you need to enter a connection URL, username, password and database name.
Note: If you are using Striim in the cloud, and have configured an on-premise agent to collect data to send to the cloud, you will need to click on ‘configure an on-premise source’. Don’t worry about the password, it is automatically encrypted by our server.
To validate the information and move to the next step, click on the “Next” button at the bottom right.
The Striim platform not only verifies that the database connection information you entered is correct, but also checks that the database user has the correct privileges, and that CDC is setup correctly. This step varies from data source to data source. If anything is wrong, you will be notified and told how to fix the problem.
For example, when Next was clicked previously, the ‘striim’ user did not have the correct privileges to receive CDC information from the MySQL database. And the results of step two were as follows.
As you can see, the explanation includes the steps required to fix the issue – an additional two permissions were needed by the ‘striim’ user to obtain the CDC information. Once ‘Replication Client’ and ‘Replication Slave’ privileges were granted, the UI permits you to move to the next step.
The Striim platform enables you to selectively choose which database schemas and tables to use CDC for. These can be chosen from a list, or through wildcard selection.
In this case we are just going to select the PRODUCT_INV table in the test database schema. Clicking Next takes us to an optional screen where you can do more complex data selection.
The final step is to specify how we want to write data to the target. When we selected the wizard we chose to write data to Apache Kafka. To complete this step, we simply need to enter a few connection properties including the target name, topic and broker URL.
The final step, before clicking “Next” one last time, is to choose the format of the output data. The Striim platform supports JSON, Delimited, XML, Avro and free text formats for Kafka topics. In this case we are selecting the JSONFormatter. This has a number of optional properties, but we are just going to go with the defaults.
Using the wizard we quickly built an integration application that collects data continuously from MySQL usingchange data capture, and writes that in real-time to Apache Kafka in JSON format. After clicking “Next” you are shown the data flow, and can now edit, run and test the data flow as necessary.
To simplify your deployments for cloud-based Kafka services, Striim is available on AWS, Azure, and Google Cloud Marketplace.
The resulting data flow can now be modified, deployed and started through the UI.
This is achieved through the state control button directly above the data flow. The initial state of the data flow is ‘created’. This means it is just a definition of what the application is supposed to do. In order to turn this into runtime objects, the application needs to be deployed.
Click on the ‘Created’ dropdown and select ‘Deploy App’ to show the Deploy UI.
Striim data flows can be deployed very flexibly. The default is to deploy the data flow to the least used node in a Striim cluster. However, the application can be deployed to all, or just some of the nodes (in predefined groups) as necessary. You can even split applications into sub-flows and deploy pieces on different parts of the cluster if required. In this case it’s easy, we’ll just deploy this whole application to one node. After deployment the application is ready to start, by selecting Start App.
Assuming you have activity on the MySQL table selected in during table selection, you should see data flowing in the UI, indicated by a number of msgs/s.
If you now click on the data stream in the middle and click on the eye icon, you can preview the data flowing between MySQL and Kafka.
Here you can see the data, metadata (these are all updates) and before values (what the data was before the update).
You can write a Kafka Consumer to see what this looks like in JSON format, or use a simple Striim application to read from Kafka, then use the built-in preview mechanism to see what it looks like.
With the Striim Platform you are not forced to use the UI, even though it is an incredibly fast and convenient way of doing things). You can instead use our scripting language.
Any streaming data flow application can be exported as a TQL script, and any TQL script can be imported then worked with in the UI.
If we go back to the original MySQLCDCToKafka application, and click on the configuration dropdown on the top right of the toolbar, you can export it.
The script for the application looks like this:
CREATE APPLICATION MySQLCDCToKafka;
CREATE SOURCE MySQLCDC USING MysqlReader (
OUTPUT TO MySQLCDC_ChangeDataStream ;
CREATE TARGET WriteToKafka USING KafkaWriter VERSION '0.8.0' (
FORMAT USING JSONFormatter (
INPUT FROM MySQLCDC_ChangeDataStream;
END APPLICATION MySQLCDCToKafka;
In our next tutorial, we will take the raw CDC data and use some in-memory SQL-based processing to transform and enrich before we write the data out to Apache Kafka. In the meantime, please feel free to request a demo with one of our lead technologists, tailored to your environment.