Streaming Kafka Integration

New Quick Start Tutorial for Streaming Kafka Integration

 

 

If you have adopted Apache Kafka as your high-performance, fault-tolerant messaging system, Kafka’s real-time integration with your critical data sources and consumers is essential for you to get the most business value. With real-time, streaming Kafka integration, you can build modern applications that access timely data in the right format, enabling time-sensitive operational decisions. However, as Kafka was designed for developers, typically, organizations rely on a team of developers’ manual efforts and specialized skillsets to stream data into, and out of Kafka, and automate data processing for its consumers.

Striim has offered SQL-query-based processing and analytics for Kafka since 2015. Drag-and-drop UI, pre-built applications and wizards for configuring Kafka integration, and custom utilities make Striim the easiest solution to build streaming integration pipelines with built-in analytics applications.

Our new tech guide: “Quick Start Tutorial for Streaming Kafka Integration,” details Striim’s capabilities for Kafka integration and stream processing. Quick Start Tutorial for Streaming Kafka IntegrationIt illustrates how users can get optimal value from Kafka by simplifying the real-time ingestion, stream processing, and delivery of a wide range of data types, including transactional data from enterprise databases, without impacting their performance. The guide offers step-by-step instructions on how to build a Kafka integration solution for moving data from a MySQL database to Kafka with log-based change data capture and in-flight data processing. You can easily use these instructions for other data sources supported by Striim.

 

Some of the key areas covered in this tech guide include how to:

  • Ingest data to Kafka in a streaming fashion from enterprise databases, such as Oracle, SQL Server, MySQL, PostgreSQL, and HPE NonStop using low-impact change data capture. Other data sources, such as system logs, sensors, Hadoop, and cloud data stores, are also discussed in this section.
  • Use SQL-based stream processing to put Kafka data in the consumable format before delivery in sub-seconds. Data formatting and the use of in-memory data cache for in-flight data enrichment are explained too.
  • Support mission-critical applications with built-in scalability, security, exactly-once-processing (E1P), and high-availability.
  • Perform SQL-based in-memory analytics, as the data is flowing through, and rapidly visualize the results of the analytics without needing to code manually.
  • Deliver real-time data from Kafka to other systems, including cloud solutions, databases, data warehouses, other messaging systems, and files with pre-built adapters.

The Striim platform addresses the complexity of Kafka integration with an end-to-end, enterprise-grade software platform. By downloading the new tech guide: “Quick Start Tutorial for Streaming Kafka Integration,” you can get started to rapidly build integration and analytics solutions without extensive coding and needing specialized skillsets. That capability enables your data scientists, business analysts, and other data professionals to focus on delivering fast business value to transform your operations.

For more resources on integrating high volumes of data in and out of Kafka, please visit our Kafka Integration and Stream Processing solution page. If you prefer to discuss your specific requirements, we would be happy to provide you a customized demo of streaming Kafka integration or other relevant use cases.

 

PostgreSQL to Kafka

Streaming Data Integration Tutorial: Adding a Kafka Target to a Real-Time Data Pipeline

This is the second post in a two-part blog series discussing how to stream database changes into Kafka. You can read part one here. We will discuss adding a Kafka target to the CDC source from the previous post. The application will ingest database changes (inserts, updates, and deletes) from the PostgreSQL source tables and deliver to Kafka to continuously to update a Kafka topic.

What is Kafka?

Apache Kafka is a popular distributed, fault-tolerant, high-performance messaging system.

Why use Striim with Kafka?

The Striim platform enables you to ingest data into Kafka, process it for different consumers, analyze, visualize, and distribute to a broad range of systems on-premises and in the cloud with an intuitive UI and SQL-based language for easy and fast development.

How to add a Kafka Target to a Striim Dataflow

From the Striim Apps page, click on the app that we created in the previous blog post and select Manage Flow.

MyPostgreSQL CDC App
MyPostgreSQL-CDC App

This will open your application in the Flow Designer.

PostgreSQL CDC App Data Flow
MyPostgrSQLCDC app data flow.

To do the writing to Kafka, we need to add a Target component into the dataflow. Click on the data stream, then on the plus (+) button, and select “Connect next Target component” from the menu.

Connecting a target component to the Data Flow
Connecting a target component to the data flow.

Enter the Target Info

The next step is to specify how to write data to the target.  With the New Target ADAPTER drop-down, select Kafka Writer Version 0.11.0, and enter a few connection properties including the target name, topic and broker URL.

Configuring the Kafka Target
Configuring the Kafka target.

Data Formatting 

Different Kafka consumers may have different requirements for the data format. When writing to Kafka in Striim, you can choose the data format with the FORMATTER drop down and optional configuration properties. Striim supports JSON, Delimited, XML, Avro and free text formats, in this case we are selecting the JSONFormatter.

Configuring the Kafka target formatter
Configuring the Kafka target FORMATTER.

Deploying and Starting the Data Flow

The resulting data flow can now be modified, deployed, and started through the UI. In order to run the application, it first needs to be deployed, click on the ‘Created’ dropdown and select ‘Deploy App’ to show the Deploy UI. 

Deploying CDC app
Deploying the app.

The application can be deployed to all nodes, any one node, or predefined groups in a Striim cluster, the default is the least used node. 

Deployment node selection.
Deployment node selection.

After deployment the application is ready to start, by selecting Start App.

Starting the app.
Starting the app.

Testing the Data Flow

You can use the PostgreSQL to Kafka sample integration application, to insert, delete, and update the PosgtreSQL CDC source table, then you should see data flowing in the UI, indicated by a number of msgs/s. (Note the message sending happens fast and quickly returns to 0).

Testing the streaming data flow.
Testing the streaming data flow.

If you now click on the data stream in the middle and click on the eye icon, you can preview the data flowing between PostgreSQL and Kafka. Here you can see the data, metadata (these are all updates) and before values (what the data was before the update).

Previewing the data flowing from PostgreSQL to Kafka
Previewing the data flowing from PostgreSQL to Kafka.

There are many other sources and targets that Striim supports for streaming data integration. Please request a demo with one of our lead technologists, tailored to your environment.

Change Data Capture - change log

Streaming Data Integration Tutorial: Using CDC to Stream Database Changes

 

This is the first in a two-part blog post discussing how to use Striim for streaming database changes to Apache Kafka. Striim offers continuous data ingestion from databases and other sources in real time; transformation and enrichment using Streaming SQL; delivery of data to multiple targets in the cloud or on-premise; and visualization of results. In this part, we will use Striim’s low-impact, real-time change data capture (CDC) feature to stream database changes (inserts, updates, and deletes) from an operational database into Striim.

What is Change Data Capture

Databases maintain change logs that record all changes made to the database contents and metadata. These change logs can be used for database recovery in the event of a crash, and also for replication or integration.

Change data capture change log

With Striim’s log-based CDC, new database transactions – including inserts, updates, and deletes – are read from source databases’ change logs and turned into a stream of events without impacting the database workload. Striim offers CDC for Oracle, SQL Server, HPE NonStop, MySQL, PostgreSQL, MongoDB, and MariaDB.

Why use Striim’s CDC?

Businesses use Striim’s CDC capabilities to feed real-time data to their big data lakes, cloud databases, and enterprise messaging systems, such as Kafka, for timely operational decision making. They also migrate from on-premises databases to cloud environments without downtime and keep cloud-based analytics environments up-to-date with on-premises databases using CDC.

How to use Striim’s CDC?

Striim’s easy-to-use CDC template wizards automate the creation of applications that leverage change data capture, to stream events as they are created, from various source systems to various targets. Apps created with templates may be modified using Flow Designer or by exporting TQL, editing it, and importing the modified TQL. Striim has templates for many source-target combinations.

In addition, Striim offers pre-built integration applications for bulk loading and CDC from PostgreSQL source databases to target systems including PostgreSQL database, Kafka, and files. You can start these applications in seconds by going to the Applications section of the Striim platform.

Striim Pre-built Sample Integration Applications
Striim pre-built sample integration applications.

In this post, we will show how to use the PostgreSQL CDC (PostgreSQL Reader) with a Striim Target using the wizards for a custom application instead of using the pre-built application mentioned above. The instructions below assume that you are using the PostgreSQL instance that comes with the Striim platform. If you are using your own PostgreSQL database instance, please review our instructions on how to set up PostgreSQL for CDC.

Using the CDC Template

To start building the CDC application, in the Striim web UI, go to the Apps page and select Add App > Start with Template. Enter PostgreSQL in the search field to narrow down the sources and select “PostgreSQL Reader to Striim”.

CDC application template
Wizard template selection when creating a new app.

Next enter the name and namespace for your application (the namespace is a way of grouping applications together).

Creating a new application using Striim.

Specifying the Data Source Properties

In the SETUP POSTGRESQL READER specify the data source and table properties:

  • the connection URL, username, and password.
  • the tables for which you want to read change data.
Configuring the Data Source in the Wizard
Configuring the data source in the wizard.

After you complete this step, your application will open in the Flow Designer.

The Wizard Generates a Data Flow
The wizard generates a data flow.

In the flow designer, you can add various processors, enrichers, transformers, and targets as shown below to complete your pipeline, in some cases with zero coding.

Flow designer enrichers and processors.

Flow designer event transformers and targets.

 

In the next blog post, we will discuss how to add a Kafka target to this data pipeline. In the meantime, please feel free to request a demo with one of our lead technologists, tailored to your environment.

 

Microsoft SQL Server to Kafka

Microsoft SQL Server CDC to Kafka

By delivering high volumes of data using Microsoft SQL Server CDC to Kafka, organizations gain visibility of their business and the vital context needed for timely operational decision making. Getting maximum value from Kafka solutions requires ingesting data from a wide variety of sources – in real time – and delivering it to users and applications that need it to take informed action to support the business.Microsoft SQL Server to Kafka

Traditional methods used to move data, such as ETL, are just not sufficient to support high-volume, high-velocity data environments. These approaches delay getting data to where it can be of real value to the organization. Moving all the data, regardless of relevance, to the target creates challenges in storing it and getting actionable data to the applications and users that need it. Microsoft SQL Server CDC to Kafka minimizes latency and prepares data so it is delivered in the correct format for different consumers to utilize.

In most cases, the data that resides in transactional databases like Microsoft SQL Server is the most valuable to the organization. The data is constantly changing reflecting every event or transaction that occurs.  Using non-intrusive, low-impact change data capture (CDC) the Striim platform moves and processes only the changed data. With Microsoft SQL Server CDC to Kafka users manage their data integration processes more efficiently and in real time. 

Using a drag-and-drop UI and pre-built wizards, Striim simplifies creating data flows for Microsoft SQL Server CDC to Kafka. Depending on the requirements of users, the data can either be delivered “as-is,” or in-flight processing can filter, transform, aggregate, mask, and enrich the data. This delivers the data in the format needed with all the relevant context to meet the needs of different Kafka consumers –with sub-second latency.

Striim is an end-to-end platform that delivers the security, recoverability, reliability (including exactly once processing), and scalability required by an enterprise-grade solution. Built-in monitoring also compares sources and targets and validates that all data has been delivered successfully. 

In addition to Microsoft SQL Server CDC to Kafka, Striim offers non-intrusive change data capture (CDC) solutions for a range of enterprise databases including Oracle, Microsoft SQL Server, PostgreSQL, MongoDB, HPE NonStop SQL/MX, HPE NonStop SQL/MP, HPE NonStop Enscribe, and MariaDB.

For more information about how to use Microsoft SQL Server CDC to Kafka to maintain real-time pipelines for continuous data movement, please visit our Change Data Capture solutions page.

If you would like a demo of how Microsoft SQL Server CDC to Kafka works and to talk to one of our technologists, please contact us to schedule a demo.

Kafka to MySQL

Kafka to MySQL

The scalable and reliable delivery of high volumes of Kafka data to enterprise targets via real-time Kafka integration gives organizations current and relevant information about their business. Loading data from Kafka to MySQL enables organizations run rich custom queries on data enhanced with pub/sub messaging data to make key operational decisions in the timeframe for them to be most effective.

Kafka to MySQLTo get optimal value from the rich messaging data generated by CRM, ERP, and e-commerce applications, large data sets need to be delivered from Kafka to MySQL with sub-second latency. Integrating data from Kafka to MySQL enhances transactional data – providing greater understanding of the state of operations. With access to this data, users and applications have the context to make decisions and take essential and timely action to support the business.

Using traditional batch-based approaches to the movement of data from Kafka to MySQL creates an unacceptable bottleneck – delaying the delivery of data to where it can be of real value to the organization. This latency limits the potential for this data to make critical operational decisions that enhance customer experiences, optimize processes, and drive revenue.

ETL methods move the data “as is” – without any pre-processing. However, depending on the requirements not all the data may be needed and the data that is necessary may need to be augmented with other data to make it useful. Ingesting high volumes of raw data creates additional challenges when it comes to storage and getting high value actionable data to users and applications.

Building real time data pipelines from Kafka to MySQL, Striim allows users to minimize latency and support their high-volume, high-velocity data environments. Striim offers real time data ingestion with in-flight processing including filtering, transformations, aggregations, masking, and enrichment to deliver relevant data from Kafka to MySQL in the right format and with full context.

Striim also includes built-in security, delivery validation, and additional features to essential for the scalability and reliability requirements of mission-critical applications. Real time pipeline monitoring detects any patterns or anomalies as the data is moving from Kafka to MySQL. Interactive dashboards provide visibility into the health of the data pipelines and highlight issues with instantaneous alerts – allowing for timely corrective action to be taken on the results of comprehensive pattern matching, correlation, outlier detection, and predictive analytics.

For more information about gaining timely intelligence from integrating high volumes of rich messaging data from Kafka to MySQL, please visit our Kafka integration page at: https://www.striim.com/solutions/kafka-real-time-integration-stream-processing/

If you would like a demo of real time data integration from Kafka to MySQL, and to talk to one of our experts, please contact us to schedule a demo.

What’s New in Striim 3.9.5

What’s New in Striim 3.9.5: More Cloud Integrations; Greater On-Prem Extensibility; Enhanced Manageability

Striim’s development team has been busy, and launched a new release of the platform, Striim 3.9.5, last week. The goal of the release was to enhance the platform’s manageability while boosting its extensibility, both on-premises and in the cloud.

I’d like to give you a quick overview of the new features; starting with expanded cloud integration capabilities.

  • Striim 3.9.5 now offers direct writers for both Azure Data Lake Storage Gen 1 and  Gen 2. This capability allows businesses to stream real-time, pre-processed data to their Azure data lake solutions from enterprise databases, log files, messaging systems such as Kafka, Hadoop, NoSQL, and sensors, deployed on-prem or in the cloud.
  • Striim’s support for Google Pub/Sub is now improved with a direct writer. Google Pub/Sub serves as a messaging service for GCP services and applications. Rapidly building real-time data pipelines into Google Pub/Sub from existing on-prem or cloud sources allows businesses to seamlessly adopt GCP for their critical business operations and achieve the maximum benefit from their cloud solutions.
  • Striim has been providing streaming data integration to Google BigQuery since 2016. With this release, Striim supports additional BigQuery functionalities such as SQL MERGE.
  • Similarly, the new release brings enhancements to Striim’s existing Azure Event Hubs Writer and Amazon Redshift Writer to simplify development and management.

In addition to cloud targets, Striim boosted its heterogeneous sources and destinations for on-premises environments too. The 3.9.5 release includes:

  • Writing to and reading from Apache Kafka version 2.1
  • Real-time data delivery to HPE NonStop SQL/MX
  • Support for compressed data when reading from GoldenGate Trail Files
  • Support for NCLOB columns in log-based change data capture from Oracle databases

Following on to the 3.9 release, Striim 3.9.5 also added a few new features to improve Striim’s ease of use and manageability:

  • Striim’s users can now organize their applications with user-defined groups and see deployment status with color-coded indicators on the UI. This feature increases productivity, especially when there are hundreds of Striim applications running or in the process of being deployed, as many of our customers do.

 

 

 

 

 

 

 

 

 

 

 

 

 

  • New recovery status indicators in Striim 3.9.5 allow users to track when the application is in the replay mode for recovery versus in the forward processing mode after the recovery is completed.
  • Striim’s application management API now allows resuming a crashed application.
  • Last but not least, Striim 3.9.5 offers easier and more detailed monitoring of open transactions in Oracle databases sources.

For a deeper dive into the new features in Striim 3.9.5, please request a customized demo. If you would like to check out any of these features for yourself, we invite you to download a free trial.

Kafka to HDFS

Kafka to HDFS

The real-time integration of messaging data from Kafka to HDFS augments transactional data for richer context. This allows organizations to gain optimal value from their analytics solutions and achieve a deeper understanding of operations – essential to establishing and sustaining competitive advantage.Kafka to HDFS

To truly leverage the high volumes of data residing in Kafka stores, companies need to be able move it, process it, and deliver it to a variety of on-premises and cloud systems with sub-second latency. It also needs to be integrated with operational data from a wide variety of sources.

Traditional batch-based solutions are not designed for situations where data is time-sensitive – they are simply too slow. To allow organizations use their data to enhance operations, tailor services, and improve customer experiences, data delivery from Kafka to HDFS systems needs to be scalable and in real time.

With Striim, companies can continuously deliver data in real time from Kafka to HDFS, as well as to a wide range of targets including Hadoop and cloud environments. Depending on the requirements of the organization, all the Kafka data can be written to a number of different targets simultaneously. In use cases where not all the data is required, data can be matched to specific criteria to deliver a highly relevant subset of data to the target.

Striim can create data flows to deliver the data from Kafka to HDFS in milliseconds, “as-is.” However, depending on how the data is going to be utilized, the user may require the data to be processed, prepared, and delivered in the right format. Striim supports continuous queries to filter, transform, aggregate, enrich, and analyze the data in-flight before delivering it with sub-second latency.

By analyzing the data in-flight, Kafka users can capture time-sensitive information as the data is flowing through the data stream. Striim pushes insights and alerts to interactive dashboards highlighting real-time data and the results of pattern matching, correlation, outlier detection, predictive analytics, and further enables drill-down and in-page filtering.

Learn more about integrating and processing Kafka to HDFS in real-time, please visit our Kafka integration page.

Our experts can show you how to get maximum value from your analytics solutions using Striim for real-time data integration from Kafka to HDFS. Please contact us to schedule a demo.

Kafka to HBase

Kafka to HBase

The real-time integration of high volumes of messaging data from Kafka to HBase gives organizations access to current and relevant information about their business that enables them to make key decisions – faster.Kafka to HBase

With rich and extensive messaging data at their disposal, organizations of all sizes can gain a deeper understanding of their business and their customers, and identify opportunities for development. For the true value of these large data sets to be realized, they need to be moved from Kafka to HBase, processed, and delivered with sub-second latency – so they are accessible and available to users and analytics applications.

Loading data from Kafka to HBase using a traditional batch-based approach presents significant limitations. The delay in getting data where it needs to be – and in a format that allows it to be immediately understood and utilized – results in missed opportunities to make decisions that optimize operations, improve customer experience, and increase revenue. Ingesting large volumes of data without any pre-processing also creates storage challenges and hinders access to actionable data analysis.

Striim makes it possible for users to build real-time data pipelines from Kafka to HBase and generate real value from the data and their investment in Big Data environments. Striim can support continuous, non-intrusive data ingestion to deliver data in real-time from Kafka to HBase in milliseconds. By processing the data in-flight without adding latency, data that is not relevant can be filtered out to help users manage storage capacity more efficiently. Striim also enriches the data with additional context to make analytics faster and easier.

For time-sensitive use cases where more urgent action is required, Striim offers streaming analytics capabilities to detect patterns and anomalies while the data is in-flight from Kafka to HBase. Interactive dashboards effectively visualize real-time data and highlight insights and alerts. Users can pinpoint issues and take immediate action on the results of pattern matching, correlation, outlier detection, and predictive analytics.

For more information about real-time data integration and processing from Kafka to HBase, please visit our Kafka integration page.

To experience for yourself how real-time data integration from Kafka to HBase works, and to talk to one of our lead technologists, please contact us to schedule a demo.

New Striim Features for Apache Kafka Integration and Processing

As Kafka Summit SF begins today, the Striim team is honored to sponsor this premier event for anyone interested in streaming data platforms. In this blog post, we would like to share with you several key new features that Striim recently introduced for Apache Kafka integration and stream processing.

Automated mapping of partitions when ingesting data from Kafka

Striim has added new capabilities to increase its performance multifold, and to simplify the setup for ingesting real-time data from Kafka message queues. A key new feature related to Kafka integration is our Kafka Reader. The Striim platform offers an automated mapping of partitions when ingesting data from Kafka. This feature both increases developer productivity and accelerates time to market.

Multi-threaded delivery with automated data distribution and thread management

The Striim platform also employs multi-threaded delivery with automated data distribution and thread management within a single Apache Kafka Writer. As a result, it scales more easily to support high-volume data environments and enables a significant increase in performance to optimize a many-core, single-node architecture.

Enhanced pipeline monitoring for the Kafka adapters

One of the key differentiators of Striim in streaming data integration is its comprehensive and real-time pipeline monitoring capabilities. In this area, we also introduced broader and deeper metrics to enhance pipeline monitoring for the Kafka adapters. This feature allows Kafka users to easily identify bottlenecks and rapidly fine-tune for even higher performance.

Schema Registry support for Apache Kafka

Last but not least, Striim introduced Schema Registry support for Apache Kafka. With this feature, users can seamlessly track and store schema evolution, and make schema changes without impacting existing applications.

Along with these new features around Apache Kafka integration, we have shared various tutorials and technical deep dives on how Striim supports Kafka integration and stream processing. I invite you to check out:

If you are not familiar with Striim, the Striim platform is used by leading organizations that rely on Apache Kafka for high-speed and fault-tolerant messaging to continuously ingest real-time data from enterprise databases, logs, sensors, and message queues. The platform enables them to process data in-flight, without coding, using its wizards and drag-and-drop UI. With in-memory SQL stream processing capabilities, Striim delivers filtered, transformed, aggregated, masked and enriched data to Kafka within milliseconds.

In addition, Kafka users look to the Striim software to analyze and visualize their data in real time, as it streams in Kafka, and deliver data and insights to cloud-based or on-premises targets. What’s unique about Striim is that, with built-in security, scalability, reliability, exactly-once-processing, and manageability in production environments, it is well suited for those who want to use an enterprise-grade solution without spending the hours and money necessary to wire together multiple different open source products.  

Please reach out to us directly to schedule a demo to discuss Apache Kafka integration solutions for your environment. Or stop by our booth today or tomorrow at the Kafka Summit in SF.

Tutorial: SQL-Based Stream Processing for Apache Kafka with In-Memory Enrichment

 

 

In this series of blog-based tutorials, we are guiding you through the process of building data flows for streaming integration and analytics applications using the Striim platform. This tutorial focuses on SQL-based stream processing for Apache Kafka with in-memory enrichment of streaming data. For context, please check out Part One of the series where we created a data flow to continuously collect change data from MySQL and deliver as JSON to Apache Kafka.

In this tutorial, we are going to process and enrich data-in-motion using continuous queries written in Striim’s SQL-based stream processing language. Using a SQL-based language is intuitive for data processing tasks, and most common SQL constructs can be utilized in a streaming environment. The main differences between using SQL for stream processing, and its more traditional use as a database query language, are that all processing is in-memory, and data is processed continuously, such that every event on an input data stream to a query can result in an output.

The first thing we are going to do with the data is extract fields we are interested in, and turn the hierarchical input data into something we can work with more easily.

Transforming Streaming Data With SQL

You may recall the data we saw in part one looked like this:

data before metadata
[86,466,869,1522184531000]</var/www/striim-com> [86,466,918,1522183459000]</var/www/striim-com> {
"PK_UPDATE":"false",
"TableName":"test.PRODUCT_INV",
"TxnID":"7777:000009:48657361:1522184531000",
"OperationName":"UPDATE",
"TimeStamp":1522184531000
}</var/www/striim-com>

 

This is the structure of our generic CDC streams. Since a single stream can contain data from multiple tables, the column values are presented as arrays which can vary in size. Information regarding the data is contained in the metadata, including the table name and operation type.

The PRODUCT_INV table in MySQL has the following structure:

LOCATION_ID int(11) PK

PRODUCT_ID int(11) PK

STOCK int(11)

LAST_UPDATED timestamp

The first step in our processing is to extract the data we want. In this case, we only want updates, and we’re going to include both the before and after images of the update for stock values.

To do the processing, we need to add a continuous query (CQ) into our dataflow. This can be achieved in a number of ways in the UI, but we will click on the datastream, then on the plus (+) button, and select “Connect next CQ component” from the menu.

Connect Next CQ Component to Add to Our First Continuous Query

As with all components in Striim, we need to give the CQ a name, so let’s call it “ExtractFields”. The processing query defaults to selecting everything from the stream we were working with.


But we want only certain data, and to restrict things to updates. When selecting the data we want, we can apply transformations to convert data types, access metadata, and many other data manipulation functions. This is the query we will use to process the incoming data stream:


Notice the use of the data array (what the data looks like after the update) in most of the selected values, but the use of the before array to obtain the prevStock.

We are also using the metadata extraction function (META) to obtain the operation name from the metadata section of the stream, and a number of type conversion functions (TO_INT for example) to force data to be of the correct data types. The date is actually being converted from a LONG timestamp representing milliseconds since the EPOCH.

</var/www/striim-com>

The final step before we can save this CQ is to choose an output stream. In this case we want a new stream, so we’ll call it “ExtractedFields”.

Data-flow with Newly Added CQ

When we click on Save, the query is created alongside the new output stream, which has a data type to match the projections (the transformed data we selected in the query).

After Clicking Save, the New CQ and Stream Are Added

The data type of the stream can be viewed by clicking on the stream icon.

Stream Properties Showing Generated Type Division

There are many different things you can do with streams themselves, such as partition them over a cluster, or switch them to being persistent (which utilizes our built-in Apache Kafka), but that is a subject for a later blog.

If we deploy and start the application (see the previous blog for a refresher) then we can see what the data now looks like in the stream.

Extracted Fields Viewed by Previewing Data Streams

As you can see it looks very different from the previous view and now only contains the fields we are interested in for the remainder of the application.

But at the moment, this new stream currently goes nowhere, while the original data is still being written to Kafka.

Writing Transformed Data to Kafka

To fix this, all we need to do is change the input stream for the WriteToKafka component.

Changing the Kafka Writer Input Stream

This changes the data flow, making it a continuous linear pipeline, and ensures our new simpler data structure is what is written to Kafka.

Linear Data Flow Including Our Process CQ Before Writing to Kafka

Utilizing Caches For Enrichment

Now that we have the data in a format we want, we can start to enrich it. Since the Striim platform is a high-speed, low latency, SQL-based stream processing platform, reference data also needs to be loaded into memory so that it can be joined with the streaming data without slowing things down. This is achieved through the use of the Cache component. Within the Striim platform, caches are backed by a distributed in-memory data grid that can contain millions of reference items distributed around a Striim cluster. Caches can be loaded from database queries, Hadoop, or files, and maintain data in-memory so that joining with them can be very fast.

A Variety of In-Memory Caches Are Available for Enrichment

In this example we are going to use two caches – one for product information loaded from a database, and another for location information loaded from a file.

Setting the Name and Datatype for the ProductInfo Cache

All caches need a name, data type, lookup key, and can optionally be refreshed periodically. We’ll call the product information cache “ProductInfo,” and create a data type to match the MySQL PRODUCT table, which contains details of each product in our CDC stream. This is define in MySQL as:

PRODUCT_ID int(11) PK

DESCRIPTION varchar(255)

PRICE decimal(8,2)

BRAND varchar(45)

CATEGORY varchar(45)

WORN varchar(45)

 

 

 

The lookup key for this cache is the primary key of the database table, or productId in this case.

 

 

All we need to do now is define how the cache obtains the data. This is done by setting the username, password, and connection URL information for the MySQL database, then selecting a table, or a query to run to access the data.

Configuring Database Properties for the ProductInfo Cache

When the application is deployed, the cache will execute the query and load all the data returned by the query into the in-memory data grid; ready to be joined with our stream.

Loading the location information from a file requires similar steps. The file in question is a comma-delimited list of locations in the following form:

Location ID, City, State, Latitude, Longitude, Population

We will create a File Cache called “LocationInfo” to read and parse this file, and load it into memory assigning correct data types to each column.

Setting the Name and Datatype for the LocationInfo Cache

 

The lookup key is the location id.

We will be reading data from the “locations.csv” file present in the product install directory “.” using the DSVParser. This parser handles all kinds of delimited files. The default is to read comma-delimited files (with optional header and quoted values), so we can keep the default properties.

Configuring FileReader Properties for the LocationInfo Cache

As with the database cache, when the application is deployed, the cache will read the file and load all the data into the in-memory data grid ready to be joined with our stream.

Dataflow Showing Both Caches Currently Ready to be Joined

Joining Streaming and Cache Data For Enrichment With SQL

The final step is to join the data in the caches with the real-time data coming from the MySQL CDC stream. This can be achieved by modifying the ExtractFields query we wrote earlier.

Full Transformation and Enrichment Query Joining the CDC Stream with Cache Data

All we are doing here is adding the ProductInfo and LocationInfo caches into the FROM clause, using fields from the caches as part of the projection, and including joins on productId and locationId as part of the WHERE clause.

The result of this query is to continuously output enriched (denormalized) events for every CDC event that occurs for the PRODUCT_INV table. If the join was more complex – such that the ids could be null, or not match the cache entries – we could change to use a variety of join syntaxes, such as OUTER joins, on the data. We will cover this topic in a subsequent blog.

When the query is saved, the dataflow changes in the UI to show that the caches are now being used by the continuous query.

Dataflow After Joining Streaming Data with Caches in the CQ

If we deploy and start the application, then preview the data on the stream prior to writing to Kafka we will see the fully-enriched records.

Results of Previewing Data After Transformation and Enrichment

The data delivered to Kafka as JSON looks like this.

{

locationId“:9,

productId“:152,

stock“:1277,

prevStock“:1383,

updateTime“:”2018-03-27T17:28:45.000-07:00”,

description“:”Dorcy 230L ZX Series Flashlight”,

price“:33.74,

brand“:”Dorcy”,

category“:”Industrial”,

worn“:”Hands”,

city“:”Dallas”,

state“:”TX”,

longitude“:-97.03,

latitude“:32.9

}

As you can see, it is very straightforward to use the Striim platform to not only integrate streaming data sources using CDC with Apache Kafka, but also to leverage SQL-based stream processing and enrich the data-in-motion without slowing the data flow.

In the next tutorial, I will delve into delivering data in different formats to multiple targets, including cloud blob storage and Hadoop.