How to Use KSQL Stream Processing and Real-Time Databases to Analyze Streaming Data in Kafka
Shawn Adams
Posted on March 30, 2020
Authored by Ari Ekmekji
Intro
In recent years, Kafka has become synonymous with “streaming,” and with features like Kafka Streams, KSQL, joins, and integrations into sinks like Elasticsearch and Druid, there are more ways than ever to build a real-time analytics application around streaming data in Kafka. With all of these stream processing and real-time data store options, though, also comes questions for when each should be used and what their pros and cons are. In this post, I’ll discuss some common real-time analytics use-cases that we have seen with our customers here at Rockset and how different real-time analytics architectures suit each of them. I hope by the end you find yourself better informed and less confused about the real-time analytics landscape and are ready to dive in to it for yourself.
First, an obligatory aside on real-time analytics.
Historically, analytics have been conducted in batch, with jobs that would run at some specified interval and process some well defined amount of data. Over the last decade however, the online nature of our world has led rise to a different paradigm of data generation in which there is no well defined start or end to the data. These unbounded “streams” of data are often comprised of customer events from an online application, sensor data from an IoT device, or events from an internal service. This shift in the way we think about our input data has necessitated a similar shift in how we process it. After all, what does it mean to compute the min or max of an unbounded stream? Hence the rise of real-time analytics, a discipline and methodology for how to run computation on data from real-time streams to produce useful results. And since streams also tend have a high data velocity, real-time analytics is generally concerned not only with the correctness of its results but also its freshness.
Kafka fit itself nicely into this new movement because it is designed to bridge data producers and consumers by providing a scalable, fault-tolerant backbone for event-like data to be written to and read from. Over the years as they have added features like Kafka Streams, KSQL, joins, Kafka ksqlDB, and integrations with various data sources and sinks, the barrier to entry has decreased while the power of the platform has simultaneously increased. It’s important to also note that while Kafka is quite powerful, there are many things it self-admittedly is not. Namely, it is not a database, it is not transactional, it is not mutable, its query language KSQL is not fully SQL-compliant, and it is not trivial to setup and maintain.
Now that we’ve settled that, let’s consider a few common use cases for Kafka and see where stream processing or a real-time database may work. We’ll discuss what a sample architecture might look like for each.
Use Case 1: Simple Filtering and Aggregation
A very common use case for stream processing is to provide basic filtering and predetermined aggregations on top of an event stream. Let’s suppose we have clickstream data coming from a consumer web application and we want to determine the number of homepage visits per hour.
To accomplish this we can use Kafka streams and KSQL. Our web application writes events into a Kafka topic called clickstream. We can then create a Kafka stream based on this topic that filters out all events where endpoint != '/'
and applies a sliding window with an interval of 1 hour over the stream and computes a count(*)
. This resulting stream can then dump the emitted records into your sink of choice– S3/GCS, Elasticsearch, Redis, Postgres, etc. Finally your internal application/dashboard can pull the metrics from this sink and display them however you like.
Note: Now with ksqlDB you can have a materialized view of a Kafka stream that is directly queryable, so you may not necessarily need to dump it into a third-party sink.
This type of setup is kind of the “hello world” of Kafka streaming analytics. It’s so simple but gets the job done, and consequently it is extremely common in real-world implementations.
Pros:
- Simple to setup
- Fast queries on the sinks for predetermined aggregations
Cons:
- You have to define a Kafka stream’s schema at stream creation time, meaning future changes in the application’s event payload could lead to schema mismatches and runtime issues
- There’s no alternate way to slice the data after-the-fact (i.e. views/minute)
Use Case 2: Enrichment
The next use case we’ll consider is stream enrichment– the process of denormalizing stream data to make downstream analytics simpler. This is sometimes called a “poor man’s join” because you are effectively joining the stream with a small, static dimension table (from SQL parlance). For example, let’s say the same clickstream data from before contained a field called countryId
. Enrichment might involve using the countryId to look up the corresponding country name, national language, etc. and inject those additional fields into the event. This would then enable downstream applications that look at the data to compute, for example, the number of non-native English speakers who load the English version of the website.
To accomplish this, the first step is to get our dimension table mapping countryId to name and language accessible in Kafka. Since everything in Kafka is a topic, even this data must be written to some new topic, let’s say called countries
. Then we need to create a KSQL table on top of that topic using the CREATE TABLE
KSQL DDL. This requires the schema and primary key be specified at creation time and will materialize the topic as an in-memory table where the latest record for each unique primary key value is represented. If the topic is partitioned, KSQL can be smart here and partition this in-memory table as well, which will improve performance. Under the hood, these in-memory tables are actually instances of RocksDB, an incredibly powerful, embeddable key value store created at Facebook by the same engineers who have now built Rockset (small world!).
Then, like before, we need to create a Kafka stream on top of the clickstream
Kafka topic. Let’s call this stream S
. Then using some SQL-like semantics, we can define another stream, let’s call it T
which will be the output of the join between that Kafka stream and our Kafka table from above. For each record in our stream S
, it will lookup the countryId
in the Kafka table we defined and add the countryName
and language
fields to the record and emit that record to stream T
.
Pros:
- Downstream applications now have access to fields from multiple sources all in one place
Cons:
- Kafka table is only keyed on one field, so joins for another field require creating another table on the same data that is keyed differently
- Kafka table being in-memory means dimension tables need to be small-ish
- Early materialization of the join can lead to stale data. For example if we had a userId field that we were trying to join on to enrich the record with the user’s total visits, the records in stream
T
would not reflect the updated value of the user’s visits after the enrichment takes place
Use Case 3: Real-Time Databases
The next step in the maturation of streaming analytics is to start running more intricate queries that bring together data from various sources. For example, let’s say we want to analyze our clickstream data as well as data about our advertising campaigns to determine how to most effectively spend our ad dollars to generate an increase in traffic. We need access to data from Kafka, our transactional store (i.e. Postgres), and maybe even data lake (i.e. S3) to tie together all the dimensions of our visits.
To accomplish this we need to pick an end-system that can ingest, index, and query all these data. Since we want to react in real-time to trends, a data warehouse is out of question since it would take too long to ETL the data there and then try to run this analysis. A database like Postgres also wouldn’t work since it is optimized for point queries, transactions, and relatively small data sizes, none of which are relevant/ideal for us.
You could argue that the approach in use case #2 may work here since we can set up one connector for each of our data sources, put everything in Kafka topics, create several ksqlDBs, and set up a cluster of Kafka streams applications. While you could make that work with enough brute force, if you want to support ad-hoc slicing of your data instead of just tracking metrics, if your dashboards and applications evolve with time, or if you want data to always be fresh and never stale, that approach won’t cut it. We effectively need a read-only replica of our data from its various sources that supports fast queries on large volumes of data; we need a real-time database.
Pros:
- Support ad-hoc slicing of data
- Integrate data from a variety of sources
- Avoid stale data
Cons:
- Another service in your infrastructure
- Another copy of your data
Real-Time Databases
Luckily we have a few good options for real-time database sinks that work with Kafka.
The first option is Apache Druid, an open-source columnar database. Druid is great because it can scale to petabytes of data and is highly optimized for aggregations. Unfortunately though it does not support joins, which means to make this work we will have to perform the enrichment ahead of time in some other service before dumping the data into Druid. Also, its architecture is such that spikes in new data being written can negatively affect queries being served.
The next option is Elasticsearch which has become immensely popular for log indexing and search, as well as other search-related applications. For point lookups on semi-structured or unstructured data, Elasticsearch may be the best option out there. Like Druid, you will still need to pre-join the data, and spikes in writes can negatively impact queries. Unlike Druid, Elasticsearch won’t be able to run aggregations as quickly, and it has its own visualization layer in Kibana, which is intuitive and great for exploratory point queries.
The final option is Rockset, a serverless real-time database that supports fully featured SQL, including joins, on data from a variety of sources. With Rockset you can join a Kafka stream with a CSV file in S3 with a table in DynamoDB in real-time as if they were all just regular tables in the same SQL database. No more stale, pre-joined data! However Rockset isn’t open source and won’t scale to petabytes like Druid, and it’s not designed for unstructured text search like Elastic.
Whichever option we pick, we will set up our Kafka topic as before and this time connect it using the appropriate sink connector to our real-time database. Other sources will also feed directly into the database, and we can point our dashboards and applications to this database instead of directly to Kafka. For example, with Rockset, we could use the web console to set up our other integrations with S3, DynamoDB, Redshift, etc. Then through Rockset’s online query editor, or through the SQL-over-REST protocol, we can start querying all of our data using familiar SQL. We can then go ahead and use a visualization tool like Tableau to create a dashboard on top of our Kafka stream and our other data sources to better view and share our findings.
For a deeper dive comparing these three, check out this blog.
Putting It Together
In the previous sections, we looked at stream processing and real-time databases, and when best to use them in conjunction with Kafka. Stream processing, with KSQL and Kafka Streams, should be your choice when performing filtering, cleansing, and enrichment, while using a real-time database sink, like Rockset, Elasticsearch, or Druid, makes sense if you are building data applications that require more complex analytics and ad hoc queries.
You could conceivably employ both in your analytics stack if your requirements involve both filtering/enrichment and complex analytic queries. For example, we could use KSQL to enrich our clickstreams with geospatial data and also use Rockset as a real-time database downstream, bringing in customer transaction and marketing data, to serve an application making recommendations to users on our site.
Hopefully the use cases discussed above have resonated with a real problem you are trying to solve. Like any other technology, Kafka can be extremely powerful when used correctly and extremely clumsy when not. I hope you now have some more clarity on how to approach a real-time analytics architecture and will be empowered to move your organization into the data future.
Posted on March 30, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 13, 2024