In this video, we demonstrate real-time streaming integration to and from Apache Kafka:

  1. Change Data Capture from MySQL and deliver to Kafka as JSON
  2. Enrich the CDC data using a cache and deliver to Kafka
  3. Read from Kafka and write to a CSV file
  4. Also write to Hadoop and Azure Blob Storage

Other resources:

 

Unedited Transcript:

How to build change data capture into Kafka and do some processing on that, and then do some delivery into other things. So this is pure integration play. You start off by doing change data capture from MySQL. In this case, MySQL would build the initial application and then configure how you get data from the source so we can figure the information to connect into MySQL. When you do this, we’ll check and make sure everything is going to work right, that you already have change data capture configured properly. And if it wasn’t, how you have to fix it and how to do it. You don’t select the tables that you’re interested in. We’ve got to collect the change data, and this is going to create a data stream, but then go to two different to Kafka.

So we’re going to configure how we want to write into Kafka, and that’s basically setting up what the broker configuration is, what the topic is and how we want to format the data. In this case, we’ve got to write to add as JSON, when we save this, this is gonna create a data flow. And the data flow is very simple. In this case, it’s two components. We go in from MySQL CDC source into a Kafka writer. We can test this by deploying the application. And it’s a two stage process. You deploy it first, which will put all the components out over the cluster and then you run it. And now we can see the data that’s flowing in between. So if I click on this, I can actually see the real time data and you see there’s a data and there was it before.

That’s basically the before updates. You get the before image as well, so you can see what’s actually changed. So this is real time data flowing through the MySQL application, the raw data may not be that useful. And one of the pieces of data in here is a product ID. And that probably doesn’t contain enough information, so what we’re going to do first is we’re going to extract the various fields from this and those various fields include the location ID products, id, how much stock there is, et cetera. This is an inventory monitoring table and we just turned that from kind of a raw array format into a set of name fields. So it’ll make it easier to work with later on. And you can see the structure is very different. Now what we’re actually seeing in that data stream, if we then add additional context to this, what we’ll be able to do is join that data with something else. So first of all, we’ll just configure this so that instead of writing the raw data add to Kafka, we’ll write that process state ad. And you can see all we have to do is change the input stream. So that will change the data flow. Now are write process data into Kafka.

But now we go into add a cache and this is a distributed in memory data grid that’s going to contain additional information that we want to join with our raw data. And so this is product information. So every product ID is a description and price and some other stuff. So first of all, we just create a data type that corresponds to our database table. Configure what the keys and the key in this case is the product Id. Then we specify how are we going to get the data. And it could be from files, it could be from HDFS. We’re going to use a database reader to load it from MySQL table. So especially specify all the connections and the query we’re going to use. And we now have a cache of products information to use this, we modify as SQL to just join in the cache. So anyone that’s ever written any SQL before knows what a join looks like. We’re just joining on the product Id. So now instead of just the raw data, we now have these additional fields that we’re pulling in in real time from the product information. So if we start this and look at the data again, you will actually be able to see the additional fields like description and brand and category and price that came from that other type that’s all joined in memory. There’s no database lookups going on is actually really, really fast.

If you already have data on Kafka or another message bus or anywhere else for that matter is new files, you may want to kind of read it and push at some of the targets. So what we’re going to do now is going to take that data we just wrote to Kafka. We’re going to use Kafka reader in this case. So it will just search for that and track the source and then we can configure that with the properties connect to the broker that we just used. So because we noticed JSON data, we’re going to use a Jason parser. I was going to break it up into a adjacent object structure. And then create this data stream. Okay, when we deploy this and start this application, it’ll start reading from that Kafka a topic.

Well, we can look at that data and we can see this is the data that we were writing that previously with all the information in it and it’s adjacent full Max. You can see the adjacent structure though. So the other targets that we go into right to the JSON structure might not work. So what were you going to do now is build a query that’s going to pull the various fields, edit that JSON structure and creates a well-defined data stream that has various individual fields in it. So a variety crew to do that. It’s directly accessing the JSON data and saves that. And now instead of the original data stream that we had with the JSON in it, when we deploy this, start it up and looked at the data, and this is incidentally how you would build applications, looking at the data all the time, as you’re building on adding additional components into it. If we’re looking at the data stream now, then you’d be able to see that we have those individual fields, which is what we had before on the other side of Kafka, but don’t forget that it may not be stream to Kafka. It could be anything else. And if you were doing something like we just did with CDC into Kafka than Kafka into additional targets, you don’t have to have Kaftan in between. You can just like take the CDC and push it out to the targets directly.

So, uh, what are we going to do now is going to add a simple target, which is going to write to a file. And we do this by choosing the file adopt. So the fall reuter and especially finding the format we want. So we are gonna write this. I’ve seen the CSV format. We actually call it DSV because it’s delimiter separated. And the limits could be anything. It doesn’t have to be a coma and save that. And now we have something that’s going to right out to the file. So if we deploy this and start this up, then we’ll be creating a file with real-time data.

And after a while it’s got some data in it and then we can use something like a Microsoft Excel to actually view the data to check that it’s kind of what we wanted. So let’s take a look in Excel and we can see the data that we initially collected from MySQL be written to capita being slightly from Kafka and then being and back out into the CSV file. They just have one target and a single data flow, or you can, it’s multiple targets if you want. We’re going to add to, in rising into Hadoop and into Azure Blob Storage. So what we do is in the case of Hadoop, we don’t want all the data to go to a dud. So as a simple CQ to restrict the data and do this by location id. So when location 10 is going to be written to do, that’s so some filtering going on there.

And now we will add in the Hadoop target. So you’re gonna write to HDFS as a target, drag that into the data flow and see there’s many ways of working the platform. We also have a scripting language by the way, that enables you to do all of this from vi or emacs or whatever your favorite attack status or is. And we’re going to write to HDFS. I see an Avro format, so it will specify the scheme of file. And then when this is started up, we’ll be writing into HDFS as well as to this local file system. And similarly, if we want to write into Azure Blob Storage, we can take the adaptive for that and just search for that and drag that in from the targets. And we’ve got to do that on the original source data, not that query. So we’ll drag it into a, that original data stream.

Okay. And now we just configure this with information from Azure. So you need to find out what is the URL, and you should know what your key is and the username and password and things like that. You go into collect that information if you don’t have it already. And then add that into the target definition for your Azure Blog Storage. I’m gonna write that out in JSON format. So that’s kind of very quickly how you can do real time streaming data integration with our platform. And all of that data was streaming. It was being created by doing changes to MySQL.