Kafka Internally - A brief story of how Kafka works

caueferreira

Cauê Ferreira

Posted on November 13, 2023

Kafka Internally - A brief story of how Kafka works

I’ve been using Kafka for the past few years and while I managed to get it to work there was something I always wondered about:

How the hell Kafka works?

I published this article years ago describing my experience with Kafka when I first used it. It's a high-level overview, more focused on architecture decisions and the experience of migrating to it. Now I want to talk about how Kafka works, and how it stores and retrieves data.

The idea is that after you read this article, you will be able to understand the picture below a bit better.

Image description

Preparing the environment

Let's start by cloning the confluentinc/cp-all-in-one GitHub repository.

git clone git@github.com:confluentinc/cp-all-in-one.git
Enter fullscreen mode Exit fullscreen mode

Once cloned, navigate to the cp-all-in-one/cp-all-in-one directory and then run the docker; it should take a while.

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

When this is done, let's check if everything is running ok by running the following command on cp-all-in-one/cp-all-in-one as well:

docker-compose ps
Enter fullscreen mode Exit fullscreen mode

The resut should be similar to this:

     Name                    Command               State                       Ports
---------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp, 9092/tcp
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp
ksql-datagen      bash -c echo Waiting for K ...   Up
ksqldb-cli        /bin/sh                          Up
ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Enter fullscreen mode Exit fullscreen mode

Now let's start working.

First we'll need to create a topic, while we could create a topic using the UI at http://localhost:9021, but I feel it is best to use the command lines so we can learn a bit more how it works.

Run the following command to create a topic named transactions with 3 partitions.

docker exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic transactions
Enter fullscreen mode Exit fullscreen mode

You should see the following result: Created topic transactions

Now, let's check our topic!

docker exec -t broker kafka-topics --bootstrap-server localhost:9092 --list | grep transactions
Enter fullscreen mode Exit fullscreen mode

You should see the transactions topic as the response from the command.

Now, let's delve into how a topic works!

 Topic

Let's understand what is a topic and how it's created. To begin with, let's check inside the broker container the transactions topic. You can either jump to the cointainer shell cp-all-in-one/cp-all-in-one, go to /var/lib/kafka/data/ and run ls transactions-*, or you can run:

docker exec -ti broker sh -c "(cd /var/lib/kafka/data; ls transactions-*)"
Enter fullscreen mode Exit fullscreen mode

Regardless, the result should be similar to the following:

transactions-0:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

transactions-1:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

transactions-2:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint
Enter fullscreen mode Exit fullscreen mode

As we can see we have three directories named transactions-* these are the tree partitions of the transactions topic that we defined when creating it. If we check the size of the files by executing ls -lh transactions-0 we should get the following:

total 4.0K
-rw-r--r-- 1 appuser appuser 10M Oct 27 20:22 00000000000000000000.index
-rw-r--r-- 1 appuser appuser   0 Oct 27 20:22 00000000000000000000.log
-rw-r--r-- 1 appuser appuser 10M Oct 27 20:22 00000000000000000000.timeindex
-rw-r--r-- 1 appuser appuser   0 Oct 27 20:22 leader-epoch-checkpoint
-rw-r--r-- 1 appuser appuser  43 Oct 27 20:22 partition.metadata
Enter fullscreen mode Exit fullscreen mode

Kafka's information are stored in log files, which are divided as:

  • 00000000000000000000.index: This file is where we can find offsets and the position of that event on the *.log file. While yes, we could search a specific event on the *.log file, due its nature of storing events, it will get larger and larger to the point where finding an event there could take a long time; whereas the .index file contains exclusive the offset and the position of the message, meaning it is faster to find the offset we're looking for.

  • 00000000000000000000.log: This file is where every event is located, in our case this is where we'll find our transactions 

  • 00000000000000000000.timeindex: This file is similar to *.index the difference being it's used to find events by the timestamp.

This sequence of numbers 00000000000000000000 in index, log and timeindex files, is the segment. The segment is - to put it simply - a file that we're using to store events. Kafka divides the events into multiple files, each of which is referred to as a segment. Each segment has a default maximum value of 1 GB, meaning that if we reach 1 GB of logs new files will be created for that partition.

Now, let's observe the creation of new log files. We won't generate 1GB of data worth of messages because I don't want this small project to consume too much space on your machine. Instead we shall reduce the segment size so any message will produce another log file.

The following command will reduce the segment file from 1GB to 100 bytes.

docker exec -it broker kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name transactions --alter --add-config segment.bytes=100
Enter fullscreen mode Exit fullscreen mode

Now lets run the following commands, it will post a few messages to kafka and we will be able to see new log files being created.

docker exec -it broker kafka-console-producer --broker-list localhost:9092 --topic transactions
Enter fullscreen mode Exit fullscreen mode

You will be presented with a prompt, write the following messages on it.

New message
New message 1
New message 2
New message 3
New message 4
New message 5
Enter fullscreen mode Exit fullscreen mode

After that if we run the command were we check the logs, we should be presented with this:

transactions-0:
00000000000000000000.index  00000000000000000003.index
00000000000000000000.log    00000000000000000003.log
00000000000000000000.timeindex  00000000000000000003.snapshot
00000000000000000001.index  00000000000000000003.timeindex
00000000000000000001.log    00000000000000000004.index
00000000000000000001.snapshot   00000000000000000004.log
00000000000000000001.timeindex  00000000000000000004.snapshot
00000000000000000002.index  00000000000000000004.timeindex
00000000000000000002.log    leader-epoch-checkpoint
00000000000000000002.snapshot   partition.metadata
00000000000000000002.timeindex

transactions-1:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

transactions-2:
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint
Enter fullscreen mode Exit fullscreen mode

Notice that when you sent the message, the broker might select another partition to write your message, meaning that you could have either of the 3 partitions updated.

Now you can toy reading the logs, you will see that each one of the logs will have a different message!

docker exec -ti broker sh -c "(cd /var/lib/kafka/data/transactions-0; cat 00000000000000000000.log)"
Enter fullscreen mode Exit fullscreen mode

Segment

Segments are the log files, they are the physical file that contains the sequence of logs on each partition. Each of the 00000000000000000000.log files are a segment, meaning that when the segment reached the 100 bytes limit, kafka created another segment file named 00000000000000000001.log. When dealing with multiple Kafka brokers we can have the segments replicated across them to ensure fault tolerance and high availability, but you wont see the same segment file wrote twice in the same broker.

We are talking alot about broker and also, did you noticed we had to use our command to interact with the topic against the broker? So what is it?

Broker

It is the broker responsability to storage and manage the kafka topics. It receives and ensures it will be storage in the required topic, it also is responsible for producing the messages that the consumers will read.

When combined, the brokers form a cluster, and they work together in various ways. The brokers elect a leader for each partition of a topic which is responsible for handling read and write requests. This means that each broker can be the leader of different topic partitions, so you could have two brokers being leaders of the same topic, each for a different partition. As the leader of a partition it is the topic responsability to produce both write and read events, since each broker is responsible for different partitions you will have fast and reliable information provided from different brokers. The replication process is handled by the followers, then the broker leader write an event on their partition, all the followers will copy the event on their replication.

Thanks to the fact the partitions are replicated across the brokers, when the leader of a partition stops working for any reason, the brokers elect another broker to be the leader of a partition and it starts to handle all write and read events. The other brokers will keep their role as followers, but will start to replicate information from the new leader.

We can observe this process in action by listening to the broker logs docker logs -f broker and then executing the following.

docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic transactions --from-beginning
Enter fullscreen mode Exit fullscreen mode

You should see something in the likes of:

[2023-10-28 17:48:16,014] INFO [GroupCoordinator 1]: Dynamic member with unknown member id joins group console-consumer-70904 in Empty state. Created a new member id console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,015] INFO [GroupCoordinator 1]: Preparing to rebalance group console-consumer-70904 in state PreparingRebalance with old generation 0 (__consumer_offsets-41) (reason: Adding new member console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,016] INFO [GroupCoordinator 1]: Stabilized group console-consumer-70904 generation 1 (__consumer_offsets-41) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2023-10-28 17:48:16,026] INFO [GroupCoordinator 1]: Assignment received from leader console-consumer-9928193a-cb91-4b0c-b542-1844b6b89085 for group console-consumer-70904 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
Enter fullscreen mode Exit fullscreen mode

Notice how the leader is elected. Since we are dealing with a single broker, it becomes the sole leader.

If we create a new producer and start to write messages - as we did before - you will see the message logging on the consumer, but more importantly if you write messages enough you should see the following.

[2023-10-28 17:49:30,763] INFO [ProducerStateManager partition=transactions-1]Wrote producer snapshot at offset 1 with 1 producer ids in 1 ms. (org.apache.kafka.storage.internals.log.ProducerStateManager)
[2023-10-28 17:49:30,764] INFO [MergedLog partition=transactions-1, dir=/var/lib/kafka/data] Rolled new log segment at offset 1 in 4 ms. (kafka.log.MergedLog)
Enter fullscreen mode Exit fullscreen mode

This is creating a new segment, a new segment, in our case since the maxium size of each segment was 100 bytes it quickly generate another file after a single message.

ZooKeeper

ZooKeeper is the cluster manager, responsible for managing and coordinating the cluster. This includes the Broker election for each topic partition, the health monitoring of the cluster and managing information from the whole ecosystem, such as what are the topics, number of partitions, replicas and its configurations.

Though ZooKeeper does its job, its being a while that people were working on subistitute it. Recently we have the option of using KRaft which is an approach that don't use ZooKeeper, it uses an consensus protocol. I'm not diving into it on this article, but I might do it on the next.

Schema Registry

Although not a part of Kafka, the schema registry is often used in conjunction with it and I want to give a quick overview about it.

In a few words, schema registry will ensure the compatibility of the producers and consumers of a topic. When you add a schema to a topic, the schema registry will validate that the message you are producing matches the schema and the consumers will read the messages based on the schema as well. This ensures that the producer will always create a message valdiated by the schema and the consumer will always be able to read the message.

The advantage of that is that the clients will always be able to process messages from the consumers, ensuring the contract compatibility.

You might eventually need to update the schema. I'm not saying you always need to ensure retrocompatibility, but you should do it more often than not. I am going to go as far as to say that if you want to break compability, you need a very good reason.

Now, let's experiment with the schema registry, beginning with the creation of another topic.

Lets then create another topic and create a schema for it, produce a message and then consume it. Starting with the topic, lets now create a topic named currencies.

docker exec broker kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic currencies
Enter fullscreen mode Exit fullscreen mode

Open a consumer

docker exec -it broker kafka-console-consumer --bootstrap-server localhost:9092 --topic currencies --from-beginning
Enter fullscreen mode Exit fullscreen mode

Then we will create a schema registry.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema" : "{\"type\": \"record\", \"name\": \"currency\", \"fields\":[{ \"name\":\"code\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/currencies/versions
Enter fullscreen mode Exit fullscreen mode

The response should be the id of your schema, if it's the first schema it should be {"id":1}.
After the command ran, you should be able to see the schema under the http://localhost:8081/subjects/currencies/versions/1 url.

Now let's try to send some messages to this topic.

docker exec -it schema-registry kafka-avro-console-producer \
  --broker-list broker:29092 \
  --topic currencies \
  --property schema.registry.url=http://schema-registry:8081 \
  --property value.schema.id=1
Enter fullscreen mode Exit fullscreen mode

After you run this command you will be able to write the message, notice that you won't really find a proper prompt nor you will be able to break lines. If you press Enter you will send an empty message and you will find an error showing that it don't match the schema. Instead when the console stops you will can write the following {"code":"GBP","name":"Pound sterling"} and then press Enter.

You should see the message GBPPound sterling on the consumer. If you try to send a message that does not comply with the schema, you should get an exception stating that

We briefly discussed compatibility earlier. Now, let's delve into this. Dealing with schema registry may be a bit trick, because you need to put some thought ahead of time to ensure that the schemas are retrocompatible. But why?

Migrating services is always complicated and can open several windows to errors, the idea of making our services retrocompatible helps to mitgate possible problems as ensuring that every service will be updated simultaneously is not only really hard to accomplish it also render our deployment slow, huge and hard to test. One of the solutions is to make sure our contracts are retrocompatible, once this is done we can deploy the producers first without breaking the consumers and then deploy the consumers as we need.

Now that we went to the details as per why we want to do this, lets go to the next step and update the version of our schema.

Currently our schema is as the following:

{
  "type": "record",
  "name": "currency",
  "fields": [
    {
      "name": "code",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Which translated to this JSON object:

{
  "code": "GBP",
  "name": "Pound sterling"
}
Enter fullscreen mode Exit fullscreen mode

Ok, now let's say we want to add the symbol, we also want to change the attribute name to description and also want to know how the currency is backed by adding its type. Our end goal would be to produce this:

{
  "code": "GBP",
  "description": "Pound sterling",
  "symbol": "£",
  "type": "FIAT"
}
Enter fullscreen mode Exit fullscreen mode

Let's start with symbol. This is an easy task, we will just add it to the next version and if the client still don't use the latest version it will simple not pick the symbol, we won't break anything and we are happy with it.

Starting with the change from name to description we get something interessing. We can't update the schema and remove the name, because this would break the compatibility. In this scenario, the producer would create events with the description and no name while the client would be expecting the name. It won't break because of the new field, but the lacking of the expected field. In order to do this change and ensure the compatibility, we will need to produce both fields: name and description.

Let's jump to the type and this one can be quite easy, it depends on how we will handle it. If we decide that the type will be a plain string, we can treat it similar to the symbol. However if we say it is an enum, things change a bit.

If only have the FIAT type at the moment, one can say the expected avro schema would be

{
  "type": {
      "type": "enum",
      "name": "Type", 
      "symbols": ["FIAT"]
  }
}
Enter fullscreen mode Exit fullscreen mode

While this would work, it poses a problem: When we add a new type it will break the compatibiliy and the client will no longer be able to process the events, to remediate that we can introduce an UNKNOWN type that the client will default to, and then we can have the client handle such event as we want.
By having this UKNOWN placeholder, when a client consumes an event that was produced with a higher avro schema version and, this event has an enum type that the client's avro version do not has, it will rollback to the defaultenum, in that case the UNKNOWN. With this information, the client won't break and you can process it accordingly.

It should then be like this:

{
  "type": {
    "type": "enum",
    "name": "Type",
    "symbols": [
      "UNKNOWN",
      "FIAT"
    ],
    "default": "UNKNOWN"
  }
}
Enter fullscreen mode Exit fullscreen mode

Now that we know how our new schema should be, lets create it:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema" : "{\"type\": \"record\", \"name\": \"currency\", \"fields\":[{ \"name\":\"code\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"symbol\",\"type\":[\"null\", \"string\"],\"default\":null},{\"name\":\"description\",\"type\":[\"null\", \"string\"],\"default\":null},{\"name\":\"currencyType\",\"type\":{\"type\":\"enum\",\"name\":\"CurrencyType\",\"symbols\":[\"UNKNOWN\",\"FIAT\"],\"default\":\"UNKNOWN\"},\"default\":\"UNKNOWN\"}]}"}' \
http://localhost:8081/subjects/currencies/versions
Enter fullscreen mode Exit fullscreen mode

Alright, so we have created the new version with default values for all new fields. Let's now post a new message and see what happens.

docker exec -it schema-registry kafka-avro-console-producer \      --broker-list broker:29092 \
  --topic currencies \
  --property schema.registry.url=http://schema-registry:8081 \
  --property value.schema.id=2
Enter fullscreen mode Exit fullscreen mode

By using the new version we can provide the new fields, our new event should look like this

{
  "code": "GBP",
  "name": "Pound sterling",
  "description": {
    "string": "Pound sterling"
  },
  "symbol": {
    "string": "£"
  },
  "currencyType": "FIAT"
}
Enter fullscreen mode Exit fullscreen mode

We duplicate Pound sterling to ensure that clients using the older version won't experience issues. Also you probably noticed that description and symbol are slightly different than name and code. Why? That's because to define it as nullable we need to provide this array of types [ "null", "string" ] and, when using Avro with a union of types we need to provide a wrapper stating what is the type.

After executing the command, the new message is sent, we should now see the new log on the consumer GBPPound sterling£Pound sterling. And that is it!

I hope this was a cool short dive into Kafka and that I made you more interested into reading more about it.

Stay awesome :)

💖 💪 🙅 🚩
caueferreira
Cauê Ferreira

Posted on November 13, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related