Chasing Messages and Offsets in the Land of librdkafka
Memgraph
Posted on March 10, 2023
Today I would like to share with you a personal endeavor with librdkafka. One of the features we wanted to push for streams was offset management for Kafka consumers, i.e., the logical offset of the {topic, partition} that a consumer starts consuming messages from.
I jumped on this with the core idea to implement a query procedure parametrized by the stream name and offset. Its implementation will simply set the offset of the consumer associated with the stream, and of course, when the stream is started, the consumer will start receiving messages from the value specified in the offset argument. I think it’s reasonable to claim that this should be fairly straightforward to implement, given that the current codebase facilitates this change easily. Besides, what can go wrong? To answer this, we will need to first take a quick glimpse at how streams are implemented in Memgraph. Note, to avoid confusion in the rest of this article, the implementation goal is to set all the offsets of all topics/partitions of the consumer associated with a given stream (for those of you wondering, more fine-grain control over offsets will be discussed in the future).
Stream Architecture in Memgraph
So, what is the high-level architecture of streams in Memgraph? Our main abstraction is the Stream class which represents a collection of Consumers
, each associated with a stream name and metadata (consumer group, topics/partitions to consume from, etc.). Under the hood, the communication between Memgraph and Kafka brokers is handled by librdkafka
and the Consumer class wraps up a RdKafka::Consumer
object, which is used to consume messages. Therefore, to summarize; the Stream manages Consumer objects and Consumer objects use librdkafka to communicate with a broker. Simple enough. For our implementation goal, it suffices to break the task down to:
Implement a Memgraph procedure that, when called, searches the stream name argument in the collection and finds its associated consumer.
For the found consumer use the
RdKafka::Consumer
object and to set its offset.
In reality, there were a few more steps involved in the task, but they are related to the query modules and are omitted here. Moreover, (1) is straightforward to implement, and it's not particularly interesting for this article. Our main focus is (2) and the offset management idiosyncrasies of librdkafka.
What is librdkafka?
So what is librdkafka, and how does it handle offsets? It’s a C client with C++ wrappers that implements the Kafka protocol, and offset management for consumers is done automatically unless configured otherwise. This means that for manual offset management, each time Memgraph processes a Kafka message successfully, it must instruct librdkafka to commit that offset. Committing an offset simply registers to the group coordinator (or to the ZooKeeper) that a message was processed. Therefore, if a consumer goes down, it will restart from the last committed offset (remember, only one consumer in a consumer group can consume from a topic partition (abbreviated tp) i.e., tp is a unit of parallelism). The last piece to understand is what happens when a consumer joins a consumer group. For simplicity, it suffices to say that a consumer group leader will start assigning partitions to the newly joined consumer. If the consumer is the first one in the group, then it will be assigned all the available topics/partitions and it will become the group leader.
Let's start implementing
Now back to my implementation, the first attempt was to use the following two functions from RdKafka:: Consumer
: assignment(std::vector<RdKafka:: TopicPartition*> &partitions)
which, according to the docs: Returns the current partition assignment as set by assign()
. Lit, the TopicPartition contains the offset field that I need to set, and all I have to do is to retrieve the partitions assigned to the consumer, set the offset, and call assign()
. Smiley face, jumping on a quick prototype, implemented a test case, fired up Kafka, and test. BOOM. Received fewer messages than I should (based on the test case set offset). Fine, probably my logic is somewhere wrong. Quickly I run gdb and the test case suddenly works 💥. I start scratching my head and after some time of investigating the docs and looking on the librdkafka source code it turns out that assignment()
is not guaranteed to work because by the time the set offset procedure is called librdkafka does not guarantee that the group coordinator has assigned the topic partitions to the consumer, therefore, introducing some form of a race. Moreover, a subtle detail that's worth mentioning and was caught by our memory sanitizer is that assignment()
breaks RAII because the argument vector contains pointers that are now owned by the caller and need to be deallocated.
Attempt number two, how do I fix the race? Librdkafka does not provide a synchronization mechanism for that. I start investigating again and it turns out that each time a partition set is assigned to a consumer it causes internally something called a partition rebalance which effectively calls a user-defined callback that accepts the partitions and performs some action. Moreover, it’s guaranteed by the time you consume()
a message, a rebalance callback will be called effectively solving the problem. Remember, in the previous paragraph I mentioned that after processing a message successfully Memgraph must commit the offset. Logically, each time we consume, we must call assignment()
to get the partitions (which contain the currently consumed offset) and then call commit. Quickly, build, rerun the setup and test, expecting to see everything green aaaaaand I get bloody red :/ What is happening? Again, I run gdb and my logic seems to be correct. Something else is happening. More debugging and this time I print the offsets returned by the assignment
function and I get a different offset than the one expected.
It’s Friday morning and I am scratching my head desperate to figure out what is happening and wrap up my patch. Attempt number three, I ask on Slack if anybody has any idea of why the assignment()
does not return the current offsets but the default ones. Nobody knows but the team jumps in and quickly starts searching. After a few hours of discussing on slack, Antonio spotted a function called position()
(this i, s unfortunately, missing from the librdkafka docs) with a beautiful description: “Retrieve current positions (offsets) for topics+partitions.”
💥 We quickly use that and we finally end up with the correct offsets 🎉
Shoutout to Redpanda
It's worth mentioning that at the time, we were also experimenting with different stream sources and I was playing around with Redpanda; a Kafka replacement and API compatible streaming engine. Therefore, during my debugging sessions, I played around with both and there were a few things that I particularly liked about Redpanda:
It worked out of the box. Literally, I downloaded the binary and used rpk for instrumentation and that was it.
rpk is the only tool that you need for quickly managing/configuring Redpanda. You can even produce or consume messages with it.
Less than 1s startup time. This is not significant compared to Kafka, but still, it's nice to have a broker available at the moment I type
rpk start redpanda
on my terminal.
Conclusion
All in all, it was a fun experience to deep dive into the librdkafka land and I definitely learned a lot about its C/C++ APIs. It also worked well even though it certainly has its own way of doing things. However, just like all software, I believe that there is room for improvements and I would encourage some reasonable contributions/discussions for the project:
Use
std::unique_ptr<>
to cleanly express ownership. Maybe the assignment should returnstd::unique_ptr
instead of accepting atopic_partition
vector as an argument?Discuss adding a synchronization mechanism for
assignment
to avoid relying on the callbacks being called at the right time by the protocol (as this requires understanding the protocol itself and not all use cases necessarily need that).Maybe merge
assignment()
andposition()
such that it's easy to retrieve the topic partitions and offsets with one call. It's not clear why these two are separated.
Posted on March 10, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.