All businesses rely on data. Historically, this data resided in monolithic databases, and batch ETL processes were used to move that data to warehouses and other data stores for reporting and analytics purposes. As businesses modernize, looking to the cloud for analytics, and striving for real-time data insights, they often find that these databases are difficult to completely replace, yet the data and transactions happening within them are essential for analytics.

To resolve this, more and more companies are moving to event-driven architectures, because of the dynamic distributed scalability which makes sharing large volumes of data across systems possible.

In this post we will look at an example which replaces batch ETL by event-driven distributed stream processing: Oracle change data capture events are extracted as they are created; enriched with in-memory, SQL-based denormalization; then delivered to the Azure Cloud to provide scalable, real-time, low-cost analytics, without affecting the source database. We will also look at using the enriched events, optionally backed by Kafka, to incrementally add other event-driven applications or services.

Continuous Data Collection, Processing, Delivery, and Analytics with the Striim Platform

Continuous Data Collection, Processing, Delivery, and Analytics with the Striim Platform

Event-Driven Architecture Patterns

Most business data is produced as a sequence of events, or an event stream: for example, web or mobile app interactions, devices, sensors, bank transactions, all continuously generate events. Even the current state of a database is the outcome of a sequence of events. Treating state as the result of a sequence of events forms the core of several event-driven patterns.

Event Sourcing is an architectural pattern in which the state of the application is determined by a sequence of events. As an example, imagine that each “event” is an incremental update to an entry in a database. In this case, the state of a particular entry is simply the accumulation of events pertaining to that entry. In the example below the stream contains the queue of all deposit and withdrawal events, and the database table persists the current account balances.

Event as a Change to an Entry in a Database

Imagine Each Event as a Change to an Entry in a Database

The events in the stream can be used to reconstruct the current account balances in the database, but not the other way around. Databases can be replicated with a technology called Change Data Capture (CDC), which collects the changes being applied to a source database, as soon as they occur by monitoring its change log, turns them into a stream of events, then applies those changes to a target database. Source code version control is another well known example of this, where the current state of a file is some base version, plus the accumulation of all changes that have been made to it.

The Change Log can be used to Replicate a Database

The Change Log can be used to Replicate a Database

What if you need to have the same set of data for different databases, for different types of use? With a stream, the same message can be processed by different consumers for different purposes. As shown below, the stream can act as a distribution point, where, following the polygot persistence pattern, events can be delivered to a variety of data stores, each using the most suited technology for a particular use case or materialized view.

Streaming Events Delivered to a Variety of Data Stores

Streaming Events Delivered to a Variety of Data Stores

Event-Driven Streaming ETL Use Case Example

Below is a diagram of the Event-Driven Streaming ETL use case example:

Event-Driven Streaming ETL Use Case Diagram

Event-Driven Streaming ETL Use Case Diagram

  1. Striim’s low-impact, real-time change data capture (CDC) feature is used to stream database changes (inserts, updates and deletes) from an Operational Oracle database into Striim
  2. CDC Events are enriched and denormalized with Streaming SQL and Cached data, in order to make relevant data available together
  3. Enriched, denormalized events are streamed to CosmosDB for real-time analytics
  4. Enriched streaming events can be monitored in real time with the Striim Web UI, and are available for further Streaming SQL analysis, wizard-based dashboards, and other applications on-premise or in the cloud.

Replacing Batch Extract with Real Time Streaming of CDC Order Events

Striim’s easy-to-use CDC wizards automate the creation of applications that leverage change data capture, to stream events as they are created, from various source systems to various targets. In this example, shown below, we use Striim’s OracleReader to read the Order OLTP transactions in Oracle redo logs and stream these insert, update, delete operations, as soon as the transactions commit, into Striim, without impacting the performance of the source database.

Configuring Database Properties for the Oracle CDC Data Source

Configuring Database Properties for the Oracle CDC Data Source

Utilizing Caches For Enrichment

Relational Databases typically have a normalized schema which makes storage efficient, but causes joins for queries, and does not scale well horizontally. NoSQL databases typically have a denormalized schema which scales across a cluster because data that is read together is stored together.

Normalized Schema with Joins for Queries Does Not Scale Horizontally

Normalized Schema with Joins for Queries Does Not Scale Horizontally

With a normalized schema, a lot of the data fields will be in the form of IDs. This is very efficient for the database, but not very useful for downstream queries or analytics without any meaning or context. In this example we want to enrich the raw Orders data with reference data from the SalesRep table, correlated by the Order Sales_Rep_ID, to produce a denormalized record including the Sales Rep Name and Email information in order to make analysis easier by making this data available together.

Since the Striim platform is a high-speed, low latency, SQL-based stream processing platform, reference data also needs to be loaded into memory so that it can be joined with the streaming data without slowing things down. This is achieved through the use of the Cache component. Within the Striim platform, caches are backed by a distributed in-memory data grid that can contain millions of reference items distributed around a Striim cluster. Caches can be loaded from database queries, Hadoop, or files, and maintain data in-memory so that joining with them can be very fast. In this example, shown below, the cache is loaded with a query on the SalesRep table using the Striim DatabaseReader.

Configuring Database Properties for the Sales Rep Cache

Configuring Database Properties for the Sales Rep Cache

Joining Streaming and Cache Data For Real Time Transforming and Enrichment With SQL

We can process and enrich data-in-motion using continuous queries written in Striim’s SQL-based stream processing language. Using a SQL-based language is intuitive for data processing tasks, and most common SQL constructs can be utilized in a streaming environment. The main differences between using SQL for stream processing, and its more traditional use as a database query language, are that all processing is in-memory, and data is processed continuously, such that every event on an input data stream to a query can result in an output.

Dataflow Showing Joining and Enrichment of CDC data with Cache

Dataflow Showing Joining and Enrichment of CDC data with Cache

This is the query we will use to process and enrich the incoming data stream:

Full Transformation and Enrichment Query Joining the CDC Stream with Cache Data

Full Transformation and Enrichment Query Joining the CDC Stream with Cache Data

In this query we select the Order stream and SalesRep cache fields that we want, apply transformations to convert data types, put the Order stream and SalesRep cache in the FROM clause, and include a join on SALES_REP_ID as part of the WHERE clause. The result of this query is to continuously output enriched (denormalized) events, shown below, for every CDC event that occurs for the Orders table.

Events After Transformation and Enrichment

Events After Transformation and Enrichment

Loading the Enriched Data to the Cloud for Real Time Analytics

Now the Oracle CDC data, streamed and enriched through Striim, can be stored simultaneously in Azure Cloud blob storage and Azure Cosmos DB, for elastic storage with advanced big data analytics, using the Striim AzureBlobWriter and the CosmosDBWriter shown below.

The image below shows the Striim flow web UI for our streaming ETL application. Flows define what data an application receives, how it processes the data, and what it does with the results.

End-to-End Data Flow

End-to-End Data Flow

Using Kafka for Streaming Replay and Application Decoupling

The enriched stream of order events can be backed by or published to Kafka for stream persistence, laying the foundation for streaming replay and application decoupling. Striim’s native Integration with Apache Kafka makes it quick and easy to leverage Kafka to make every data source re-playable, enabling recovery even for streaming sources that cannot be rewound. This also acts to decouple applications, enabling multiple applications to be powered by the same data source, and for new applications, caches or views to be added later.

Streaming SQL for Aggregates

We can further use Striims Streaming SQL on the denormalized data to make a real time stream of summary metrics about the events being processed available to Striim Real-Time Dashboards and other applications. For example, to create a running count and sum of orders per SalesRep in the last hour, from the stream of enriched orders, you would use a window, and the familiar group by clause.

CREATE WINDOW OrderWindow
OVER EnrichCQ
KEEP WITHIN 1 HOUR
PARTITION BY sales_rep_id

SELECT sales_rep_id, sales_rep_Name,
COUNT(*) as orderCount,
SUM(order_total) as totalAmount
FROM OrderWindow
GROUP BY sales_rep_id

Monitoring

With the Striim Monitoring Web UI we can now monitor our data pipeline with real-time information for the cluster, application components, servers, and agents. The Main monitor page allows to visualize summary statistics for Events Processed, App CPU%, Server Memory, or Server CPU%. Below the Monitor App page displays our App Resources, Performance and Components.

Striim Monitoring Web UI Monitor App Page

Striim Monitoring Web UI Monitor App Page

Clicking on an app component more details button, displays more detailed performance information such as CPU and Event rate as shown below:

Striim Monitoring Web UI Monitor App Component Details Page

Striim Monitoring Web UI Monitor App Component Details Page

Summary

In this blog post, we discussed how we can use Striim to:

  1. Stream database changes in real-time
  2. Use streaming SQL and caches to easily denormalize data in order to make relevant data available together
  3. Load streaming enriched data to the cloud for real-time analytics
  4. Use Kafka for persistent streams
  5. Create rolling aggregates with streaming SQL
  6. Continuously monitor data pipelines

Additional Resources:

To read more about real-time data ingestion, please visit our Real-Time Data Integration solutions page.

To learn more about the power of streaming SQL, visit Striim Platform Overview product page, schedule a demo with a Striim technologist, or download a free trial of the platform and try it for yourself!

To learn more about Striim’s capabilities to support the data integration requirements for an Azure hybrid cloud architecture check out all of Striim’s solutions for Azure.