Tutorial

Streaming SQL on Kafka with Striim

Data integration and SQL-based processing for Kafka with Striim

Benefits

Efficient Data Processing
Process streaming data quickly and effectively between enterprise databases and Kafka

Streamlined SQL-Based Queries
Transform, filter, aggregate, enrich, and correlate your real-time data using continuous queries 

ACID-Compliant CDC
Striim and Confluent work together to ensure high-performance, ACID-compliant Change Data Capture
On this page

Overview

Apache Kafka is a powerful messaging system, renowned for its speed, scalability, and fault-tolerant capabilities. It is widely used by organizations to reliably transfer data. However, deploying and maintaining Kafka-based streaming and analytics applications can require a team of developers and engineers capable of writing and managing substantial code. Striim is designed to simplify the process, allowing users to reap the full potential of Kafka without extensive coding.

Striim and Confluent, Inc. (founded by the creators of Apache Kafka), partnered to bring real-time change data capture (CDC) to the Kafka ecosystem. By integrating Striim with Confluent Kafka, organizations can achieve a cost-effective, unobtrusive solution for moving transactional data onto Apache Kafka message queues in real time. This delivery solution is managed through a single application that offers enterprise-level security, scalability, and dependability.

The Striim platform helps Kafka users quickly and effectively process streaming data from enterprise databases to Kafka. Streamlined SQL-like queries allow for data transformations, filtering, aggregation, enrichment, and correlation. Furthermore, Striim and Confluent work together to ensure high-performance, ACID-compliant CDC and faster Streaming SQL queries on Kafka. For further insights into the strengths of the Striim and Kafka integration, visit our comparison page.

This recipe will guide you through the process of setting up Striim applications (Striim apps) with Confluent Kafka. Two applications will be set up: one with Kafka as the data source using the Kafka Reader component and another with Kafka as the destination with the Kafka Writer component. You can download the associated TQL files from our community GitHub page and deploy them into your free Striim Developer account. Please follow the steps outlined in this recipe to configure your sources and targets.

Core Striim Components

Kafka Reader: Kafka Reader reads data from a topic in Apache Kafka 0.11 or 2.1. 

Kafka Writer: Kafka Writer writes to a topic in Apache Kafka 0.11 or 2.1.

Stream: A stream passes one component’s output to one or more components. For example, a simple flow that only writes to a file might have this sequence.

Snowflake Writer: Striim’s Snowflake Writer writes to one or more existing tables in Snowflake. Events are staged to local storage, Azure Storage, or AWS S3, then written to Snowflake as per the Upload Policy setting.

Mongodb Reader: Striim supports MongoDB versions 2.6 through 5.0 and MongoDB and MongoDB Atlas on AWS, Azure, and Google Cloud Platform. 

Continuous Query: Striim continuous queries are continually running SQL queries that act on real-time data and may be used to filter, aggregate, join, enrich, and transform events.

App 1: Kafka Source to Snowflake Target

For the first app, we have used Confluent Kafka (Version 2.1) as our source. Data is read from a Kafka topic and processed in real time before being streamed to a Snowflake target warehouse. Please follow the steps below to set up the Striim app from the Flow Designer in your Striim Developer account. If you do not have an account yet, please follow this tutorial  to sign up for a free Striim Developer account in a few simple steps.

Step 1: Configure the Kafka Source adapter

In this recipe the Kafka topic is hosted on Confluent. Confluent offers a free trial version for learning and exploring Kafka and Confluent Cloud. To sign-up for a free trial of Confluent cloud, please follow the Confluent documentation. You can create a topic inside your free cluster and use it as the source for our Striim app.

To configure your source adapter from the Flow Designer, click on ‘Create app’ on your homepage followed by ‘Start from scratch’. Name your app and click ‘Save’.

From the side panel, drag the Kafka source component and enter the connection details.

Add the broker address that you can find under client information on Confluent Cloud, also called the bootstrap server.

Enter the offset from where you want to stream data from your topic. Change the Kafka Config value and property separators as shown above. For the Kafka Config field you will need API key and API secret of your Confluent Kafka topic. The Kafka Config is entered in the following

format:session.timeout.ms==60000:sasl.mechanism== PLAIN:sasl.jaas.config==org.apache.kafka.common.security.plain.PlainLoginModule required username=””password=””;  :ssl.endpoint.identification.algorithm==https:security.protocol==SASL_SSL

You can copy the sasl.jaas.config from client information on Confluent Cloud and use the correct separators for the Kafka Config string.

Step 2: Add a Continuous Query to process the output stream

Now the data streamed from the Kafka source will be processed in real time for various analytical applications. In this recipe the data is processed with SQL-like query that converts the JSON values into a structured table which is then streamed into your Snowflake warehouse, all in real time.

Drag the CQ component from the side panel and enter the following query. You can copy the SQL query from our GitHub page.

Step 3: Configure your Snowflake Target

On your target Snowflake warehouse, create a table with the same schema as the processed stream from the above Continuous Query. Enter the connection details and save. You can learn more about Snowflake Writer from this recipe.

Step 4: Deploy and run the app

Once the source, target and CQ are configured, select Deploy from the dropdown menu next to ‘Created’. Choose any available node and click Deploy. After the app is deployed, from the same drop-down, select StartApp.

You can preview the processed data by clicking on the ‘eye’ icon next to the stream component.

 

App 2: MongoDB Source to Kafka Target

In this app, real-time data from MongoDB has been processed with SQL-like queries and replicated to a Kafka topic on Confluent. Follow the steps below to configure a MongoDB to Kafka streaming app on Striim. As shown in app 1 above, first name your app and go to the Flow Designer.

Step 1: Set up your MongoDB Source

Configure your MongoDB source by filling in the connection details. Follow this recipe for detailed steps on setting up a MongoDB source on Striim. Enter the connection url, username, password and the collection data that you want to stream.

Step 2: Add a Continuous Query to process incoming data

Once the source is configured, we will run a query on the data stream to process it. You can copy and paste the code from our GitHub page.

Step 3: Set up the Kafka target

After the data is processed, it is written to a Confluent Kafka topic. The configuration for the Kafka Writer is similar to Kafka Reader as shown in app 1. Enter the connection details of your Kafka and click Save.

 

Step 4: Deploy and run the app

After the source and target adapters are configured, click Deploy followed by Startapp to run the data stream.

You can preview the processed data through the ‘eye’ wizard next to the data stream.

As seen on the target Kafka messages, the data from MongoDB source is streamed into the Kafka topic.

Setting Up the Striim Applications

App 1: Kafka Source to Snowflake Target

Step 1: Configure the Kafka Source Adapter

Kafka Config: 

session.timeout.ms==60000:sasl.mechanism==PLAIN: sasl.jaas.config==org.apache.kafka.common.security.plain.PlainLoginModule required username=””password=””; :ssl.endpoint.identification.algorithm==https:security.protocol==SASL_SSL

Step 2: Add a Continuous Query to process the output stream

select TO_STRING(data.get(“ordertime”)) as ordertime,
  TO_STRING(data.get(“orderid”)) as orderid,
  TO_STRING(data.get(“itemid”)) as itemid,
  TO_STRING(data.get(“address”)) as address
from kafkaOutputStream;

Step 3: Configure your Snowflake target

Step 4: Deploy and run the Striim app

App 2: MongoDB Source to Kafka target

Step 1: Set up your MongoDB Source

Step 2: Add a Continuous Query to process incoming data

SELECT

TO_STRING(data.get(“_id”)) as id,
TO_STRING(data.get(“name”)) as name,
TO_STRING(data.get(“property_type”)) as property_type,
TO_STRING(data.get(“room_type”)) as room_type,
TO_STRING(data.get(“bed_type”)) as bed_type,
TO_STRING(data.get(“minimum_nights”)) as minimum_nights,
TO_STRING(data.get(“cancellation_policy”)) as cancellation_policy,
TO_STRING(data.get(“accommodates”)) as accommodates,
TO_STRING(data.get(“bedrooms”)) as no_of_bedrooms,
TO_STRING(data.get(“beds”)) as no_of_beds,
TO_STRING(data.get(“number_of_reviews”)) as no_of_reviews
FROM mongoOutputStream l

Step 3: Set up the Kafka target

Step 4: Deploy and run the app

Wrapping Up: Start your Free Trial Today

The above tutorial describes how you can use Striim with Confluent Kafka  to move change data into the Kafka messaging system. Striim’s pipelines are portable between multiple clouds across hundreds of endpoint connectors. You can create your own applications that cater to your needs. Please find the app TQL and data used in this recipe on our GitHub repository.

As always, feel free to reach out to our integration experts to schedule a demo, or try Striim for free here.

Tools you need

Striim

Striim’s unified data integration and streaming platform connects clouds, data and applications.

Snowflake

Snowflake is a cloud-native relational data warehouse that offers flexible and scalable architecture for storage, compute and cloud services.

Apache Kafka

Apache Kafka is an open-source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale.

MongoDB

NoSQL database that provides support for JSON-like storage with full indexing support.