Abhishek Gupta
Posted on March 5, 2020
Kafka Streams is a Java library for developing stream processing applications on top of Apache Kafka. This is the first in a series of blog posts on Kafka Streams and its APIs.
This is not a "theoretical guide" about Kafka Stream (although I have covered some of those aspects in the past)
In this part, we will cover stateless operations in the Kafka Streams DSL API - specifically, the functions available in KStream
such as filter
, map
, groupBy
etc. The DSL API in Kafka Streams offers a powerful, functional style programming model to define stream processing topologies. Please note that the KTable
API also offers stateless functions and what's covered in this post will be applicable in that case as well (more or less)
The APIs (
KStream
etc.) referenced in this post can be found in the Kafka Streams javadocs
The setup
To start things, you need to create a KafkaStreams
instance. It needs a Topology
and related configuration (in the form of a java.util.Properties
)
Set the required configuration for your Kafka streams app:
Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
We can then build a Topology
which defines the processing pipeline (the rest of this blog post will focus on the stateless parts of a topology)
You can create the KafkaStreams
instance and start processing
KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever
Stateless operations using KStream
I generally like categorizing things into buckets - helps me "divide and conquer". I have tried the same in this case by dividing various KStream
operations into filter
, map
etc.
Let's dig in!
filter
You can use filter
to omit or include records based on a criteria. For example, if the value sent to a topic contains a word and you want to include the ones which are greater than a specified length. You can define this criteria using a a Predicate
and pass it to the filter
method - this will create a new KStream
instance with the filtered records
KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
@Override
public boolean test(String k, String v) {
return v.length() > 5;
}
})
It is also possible to use filterNot
if you want to exclude
records based on a criteria. Here is a lambda style example:
KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
map
A commonly used stateless operation is map
. In case of Kafka Streams, it can be used to transform each record in the input KStream
by applying a mapper function
This is available in multiple flavors - map
, mapValues
, flatMap
, flatMapValues
Simply use the map
method if you want to alter both key and the value. For e.g., to convert key and value to uppercase
stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String k, String v) {
return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
}
});
Use mapValues
if all you want to alter is the value:
stream.mapValues(value -> value.toUpperCase());
flatMap
similar to map, but it allows you to return multiple records (KeyValue
s)
stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
@Override
public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
String[] values = csv.split(",");
return Arrays.asList(values)
.stream()
.map(value -> new KeyValue<>(k, value))
.collect(Collectors.toList());
}
})
In the above example, each record in the stream gets flatMap
ped such that each CSV (comma separated) value is first split into its constituents and a KeyValue
pair is created for each part of the CSV string. For e.g. if you have these records (foo <-> a,b,c)
and (bar <-> d,e)
(where foo
and bar
are keys), the resulting stream will have five entries - (foo,a)
, (foo,b)
, (foo,c)
, (bar,d)
, (bar,e)
Use flatMapValues
if you only want to accept a value from the stream and return a collection of values
group
If you want to perform stateful aggegations on the contents of a KStream
, you will first need to group its records by their key to create a KGroupedStream
.
we will cover stateful operations on
KGroupedStream
in subsequent blog posts in this series
Here is an example of how you can do this using groupByKey
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KGroupedStream<String,String> kgs = stream.groupByKey();
A generalized version of groupByKey
is groupBy
which gives you the ability to group based on a different key using a KeyValueMapper
stream.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
});
In both cases (groupByKey
and groupBy
), if you need to use a different Serde
(Serializer
and Deserializer
) instead of the default ones, use the overloaded version which accepts a Grouped
object
stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));
Terminal operations
A terminal operation in Kafka Streams is a method that returns void
instead of an intermediate such as another KStream
or KTable
.
You can use the to
method to store the records of a KStream
to a topic in Kafka.
KStream<String, String> stream = builder.stream("words");
stream.mapValues(value -> value.toUpperCase())
.to("uppercase-words");
An overloaded version of to
allows you to specify a Produced
object to customize the Serdes
and partitioner
stream.mapValues(value -> value.toUpperCase())
.to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));
Instead of specifying a static topic name, you can make use of a TopicNameExtractor
and include any custom logic to choose a specific topic in a dynamic fashion
stream.mapValues(value -> value.toUpperCase())
.to(new TopicNameExtractor<String, String>() {
@Override
public String extract(String k, String v, RecordContext rc) {
return rc.topic()+"_uppercase";
}
});
In this example, we make use of the RecordContext
which contains the metadata of the record, to get the topic and append _uppercase
to it
In all the above cases, the sink topic should pre-exist in Kafka
If you want to log the KStream
records (for debugging purposes), use the print
method. It accepts an instance of Printed
to configure the behavior.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());
This will print out the records e.g. if you pass in (foo, bar)
and (john,doe)
to the input topic, they will get converted to uppercase and logged as such:
[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE
You can also use
Printed.toFile
(instead oftoSysOut
) to target a specific file
foreach
method is similar to print
and peek
i.e.
- it is also a terminal operation (like
print
) - and it accepts a
ForeachAction
(likepeek
)
Miscellaneous operations
Since print
method is a terminal operation, you have the option of using peek
which returns the same KStream
instance! It accepts a ForeachAction
which can use to specify what you want to do for each record e.g. log the key and value
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.peek((k,v) -> System.out.println("key="+k+", value="+v))
.to(OUTPUT_TOPIC);
In the above example, you will be able to see the key and values being logged and they will also be materialized to the output topic (unlike the print
operation)
branch
is a method which I have not used (to be honest!), but it looks quite interesting. It gives you the ability evaluate every record in a KStream
against multiple criteria (represented by a Predicate
) and output multiple (an array of) KStream
s. The key here is that you can use multiple Predicates instead of a single one as is the case with filter
and filterNot
.
You can merge
two KStream
s together into a single one.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
stream1.merge(stream2).to("output-topic");
please note that the resulting stream may not have all the records in order
If you want to derive a new key (it can have a different type as well) for each record in your KStream
, use the selectKey
method which accepts a KeyValueMapper
. selectKey
is similar to map
but the difference is that map
restricts the return type to a KeyValue
object
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.selectKey(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String k, String v) {
return k.toUpperCase();
}
})
While developing your processing pipelines with Kafka Streams DSL, you will find yourself pushing resulting stream records to an output topic using to
and then creating a new stream from that (output) topic i.e.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);
//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);
//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);
This can be simplified by using the through
method. So you can rewrite the above as follows:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase())
.through(OUTPUT_TOPIC)
.filter((k,v) -> v.length > 5)
.to(LENGTHY_WORDS_TOPIC);
Here, we materialize the records (with upper case values) to an intermediate topic and continue processing (using filter
in this case) and finally store post-filtration results in another topic
That's it for now. Stay tuned for upcoming posts in this series!
References
Please don't forget to check out the following resources for Kafka Streams
Posted on March 5, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.