How to Build a Kafka Producer in Rust with Partitioning

schultyy

Jan Schulte

Posted on March 3, 2023

How to Build a Kafka Producer in Rust with Partitioning

Hero Image by Askhat Gilyakhov

In this blog post we build an Apache Kafka producer in Rust,
showcasing how you partition data for a specific topic.

When it comes to writing producers for Kafka, writing a simple
producer

is straightforward. With a growing amount of data, however, it becomes
necessary to publish data in a way that allows consumers to work with it efficiently.

To showcase how to produce Kafka events, we will leverage Tokio's tracing crate to generate log data.

Setup a new Project

In this tutorial, our dev setup consists of VSCode, Rust, and Docker to build a Kafka Producer.

Let's get started with a new Rust project. In your terminal, run:

$ cargo new tracing_publisher
Enter fullscreen mode Exit fullscreen mode

As mentioned above, we want to use VSCode's
DevContainer feature to run our Rust code, but also to run Kafka and Zookeeper.

Dev Containers require a config file in a separate directory:

$ cd tracing_publisher
$ mkdir .devcontainer
$ cat > .devcontainer/devcontainer.json
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
  "name": "Rust",
  "service": "rust-log-processing",
  "dockerComposeFile": "../docker-compose.yml",
  "features": {
   "ghcr.io/devcontainers/features/rust:1": {}
  },
  "workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}",
  "shutdownAction": "stopCompose"
}
Enter fullscreen mode Exit fullscreen mode

Additionally, we also need a docker-compose.yml file to orchestrate all Docker containers:

#docker-compose.yml
version: '3.8'
services:
rust-log-processing:
image: mcr.microsoft.com/devcontainers/rust:0-1-bullseye
volumes:
- ..:/workspaces:cached
  cap_add:
  - SYS_PTRACE
    security_opt:
    - seccomp:unconfined
      command: /bin/sh -c "while sleep 1000; do :; done"
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
     # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
     - "9092:9092"
       depends_on:
       - zookeeper
         environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

With these configuration files in place, let's open the project in VSCode:

$ code .
Enter fullscreen mode Exit fullscreen mode

*Please Note: Once VSCode has loaded the project, it should present
you with a message in the bottom right corner, asking if you'd like to Open this Project in a container:
*

VSCode prompt to open project in Dev<br>
Container

Click on Reopen in Container. This step will take a few moments to complete, as it downloads all Docker images and installs several components into our development container.

Once VSCode finished loading, click View > Terminal to open the integrated Terminal. We will use this terminal session to execute all commands inside the dev container unless noted otherwise.

Before we add the first few lines of code, let's install some
dependencies we will need (You need to run these commands in the VSCode Terminal):

$ cargo add anyhow
$ cargo add kafka
$ cargo add serde_json
$ cargo add tracing
$ cargo add tracing_subscriber
Enter fullscreen mode Exit fullscreen mode

Collecting Telemetry Data from a Rust Program

We will build a custom tracing subscriber layer to gather telemetry data produced by the tracing crate. We will then send that telemetry data to Kafka.

We won't cover the actual collection mechanism in detail. This aspect is based on the work of Bryan Burgers excellent blog post on "Custom
Logging in Rust Using tracing and
tracing-subscriber"
.

Before we add code, in the terminal, run:

$ cargo run 
Hello, World!
Enter fullscreen mode Exit fullscreen mode

to ensure everything works perfectly. Next, let's add the following code
to main.rs:

use tracing::info;
use tracing_subscriber::prelude::*;

mod custom_layer;
use custom_layer::CustomLayer;

fn main() {
    // Set up how `tracing-subscriber` will deal with tracing data.
    tracing_subscriber::registry().with(CustomLayer).init();

    // Log something simple. In `tracing` parlance, this creates an "event".
    info!(a_bool = true, answer = 42, message = "first example");
}
Enter fullscreen mode Exit fullscreen mode

Then, create a new module called custom_layer.rs:

// Credit: https://github.com/bryanburgers/tracing-blog-post/blob/main/examples/figure_3/custom_layer.rs
use std::collections::BTreeMap;
use tracing_subscriber::Layer;

pub struct CustomLayer;

impl<S> Layer<S> for CustomLayer
where
    S: tracing::Subscriber,
{
    fn on_event(
        &self,
        event: &tracing::Event<'_>,
        _ctx: tracing_subscriber::layer::Context<'_, S>,
    ) {
        // Covert the values into a JSON object
        let mut fields = BTreeMap::new();
        let mut visitor = JsonVisitor(&mut fields);
        event.record(&mut visitor);

        // Output the event in JSON
        let output = serde_json::json!({
            "target": event.metadata().target(),
            "name": event.metadata().name(),
            "level": format!("{:?}", event.metadata().level()),
            "fields": fields,
        });
        println!("{}", serde_json::to_string_pretty(&output).unwrap());
    }
}

struct JsonVisitor<'a>(&'a mut BTreeMap<String, serde_json::Value>);

impl<'a> tracing::field::Visit for JsonVisitor<'a> {
    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
        self.0
            .insert(field.name().to_string(), serde_json::json!(value));
    }

    fn record_error(
        &mut self,
        field: &tracing::field::Field,
        value: &(dyn std::error::Error + 'static),
    ) {
        self.0.insert(
            field.name().to_string(),
            serde_json::json!(value.to_string()),
        );
    }

    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
        self.0.insert(
            field.name().to_string(),
            serde_json::json!(format!("{:?}", value)),
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

With this code in place, let's run it:

$ cargo run 
{
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
}
Enter fullscreen mode Exit fullscreen mode

Now, every time we use info!, warn!, trace!, or error!, our new extension turns this event into json.

In the next step, we will use the generated JSON and produce Kafka messages from it.

Implement a Kafka Publisher with a Partitioning Strategy

We will implement the publishing capability in a separate impl block custom_layer.rs. Before we get started with that, let's enhance the CustomLayer struct. Change this:

//src/custom_layer.rs
pub struct CustomLayer;
Enter fullscreen mode Exit fullscreen mode

to:

//src/custom_layer.rs
use std::collections::BTreeMap;
use tracing::Level;
use tracing_subscriber::Layer;

pub struct CustomLayer {
   kafka_broker: String,
}
Enter fullscreen mode Exit fullscreen mode

Next, in src/custom_layer.rs, create a new impl in which we will add a new method, as well as our Kafka functionality:

impl CustomLayer {
    pub fn new(kafka_broker: &str) -> Self {
         Self {
             kafka_broker: kafka_broker.into(),
         }
    }
} 
Enter fullscreen mode Exit fullscreen mode

The initializer accepts a Kafka broker address (e.g. broker:9092).
Since we're using Docker Compose, we can refer to the broker by its container name (broker).

send_event contains the most important functionality:

use std::{collections::BTreeMap, time::Duration};
use kafka::producer::{Record, Producer, RequiredAcks};

impl CustomLayer {
   //...
   fn send_event(&self, topic: &str, log_level: &Level, serialized_event: &str) {
       let mut producer = self.create_producer();
       let partition_key = log_level.as_str();

       let record = Record {
           topic: &self.to_kafka_topic_name(topic),
           key: partition_key.to_lowercase(),
           partition: -1,
           value: serialized_event.as_bytes()
       };

       producer
           .send(&record)
           .unwrap();
   }
}

 fn to_kafka_topic_name(&self, input: &str) -> String {
     input.replace('_', "").replace("::", "-")
 }

 fn create_producer(&self) -> Producer {
     Producer::from_hosts(vec![self.kafka_broker.to_owned()])
         .with_ack_timeout(Duration::from_secs(1))
         .with_required_acks(RequiredAcks::One)
         .create()
         .unwrap()
}
Enter fullscreen mode Exit fullscreen mode

send_event accepts a Kafka topic, the event's log level (which we will use to determine the partition) and a payload (serialized_event).

It creates a Record, a Kafka data structure, and sends it off to the broker.

A Primer on Kafka Partitions

Kafka organizes different types of messages in topics. A topic is a logical separation between different types of messages. It contains a certain type of event, for instance log messages, while another might hold customer events.

Often, with a high amount of data, it makes sense to split a topic into several partitions. We do this for several reasons:

  • It's easier to replicate a partition across different Kafka brokers
  • It's easier to distribute the load on the consumer side

If you're curious to learn more about the details, check out this Post.
In our case, we're distributing log messages into separate partitions based on their severity.

Let's examine send_event in a bit more detail:

 fn send_event(&self, topic: &str, log_level: &Level, serialized_event: &str) {
       let mut producer = self.create_producer();
       let partition_key = log_level.as_str();

       let record = Record {
           topic: &self.to_kafka_topic_name(topic),
           key: partition_key.to_lowercase(),
           partition: -1,
           value: serialized_event.as_bytes()
       };
       //...
   }
}
Enter fullscreen mode Exit fullscreen mode

When we construct a new Record, besides providing the topic and the payload (value), we also specify a key, as well as partition.

Both, key and partition allow us to influence in which partition this record should end up in.

partition: -1 tells the Kafka broker to determine the partition on its own. Since we provided a key, it will use the key to decide. key is an optional value. If not provided, Kafka will use a round-Robbin approach to distribute events into partitions. If we choose to provide a
key, it should be related to the application's domain, such as a user id, or, in our case, the application's name.

Publish Log Events

With all Kafka-related code in place, let's start publishing events.
Find the on_event method in the Layer impl block and add a call to send_event:

fn on_event(
     &self,
     event: &tracing::Event<'_>,
     _ctx: tracing_subscriber::layer::Context<'_, S>,
 ) {
     let mut fields = BTreeMap::new();
     let mut visitor = JsonVisitor(&mut fields);
     event.record(&mut visitor);

     // Output the event in JSON
     let output = serde_json::json!({
         "target": event.metadata().target(),
         "name": event.metadata().name(),
         "level": format!("{:?}", event.metadata().level()),
         "fields": fields,
         "timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
     });
     //add this line
     let serialized_event = serde_json::to_string_pretty(&output).unwrap();
     println!("{}", serialized_event);
     //add this line
     self.send_event(event.metadata().target(), event.metadata().level(), &serialized_event);
 }
Enter fullscreen mode Exit fullscreen mode

From now on, every time we intercept a tracing event, it will get sent to Kafka.

Before we can start running the program, we need a little bit of configuration in main.rs:

use tracing::info;
use tracing_subscriber::prelude::*;

mod custom_layer;
use custom_layer::CustomLayer;

fn main() {
    // Set up how `tracing-subscriber` will deal with tracing data.
    //change this line
    tracing_subscriber::registry().with(CustomLayer::new("broker:9092")).init();

    // Log something simple. In `tracing` parlance, this creates an "event".
    info!(a_bool = true, answer = 42, message = "first example");
}
Enter fullscreen mode Exit fullscreen mode

If we run our code now, we will see the following output:

$ cargo run
{
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
 thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Kafka(UnknownTopicOrPartition)', src/custom_layer.rs:35:32
 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Enter fullscreen mode Exit fullscreen mode

Our program fails because we haven't created the Kafka topic yet. While Kafka can be configured to create topics on an ad-hoc basis, we will create the topic manually.

Configure Kafka

Before we create the topic, let's quickly discuss how our code determines topic names.

fn on_event(
    &self,
    event: &tracing::Event<'_>,
    _ctx: tracing_subscriber::layer::Context<'_, S>,
) {
    // Covert the values into a JSON object
    let mut fields = BTreeMap::new();
    let mut visitor = JsonVisitor(&mut fields);
    event.record(&mut visitor);

    // Output the event in JSON
    let output = serde_json::json!({
        "target": event.metadata().target(),
        "name": event.metadata().name(),
        "level": format!("{:?}", event.metadata().level()),
        "fields": fields,
    });
    let serialized_event = serde_json::to_string_pretty(&output).unwrap();
    println!("{}", serialized_event);
    self.send_event(
        event.metadata().target(), //This is our topic name
        event.metadata().level(),
        &serialized_event,
    );
}
Enter fullscreen mode Exit fullscreen mode

The send_event method accepts a topic name as argument. In on_event, we provide event.metadata().target() as our topic name. target contains the application's name: tracing_publisher. After processing the name via to_kafka_topic, our topic name will be tracingpublisher.

If we had more applications sending log data, we would want to separate data per application, therefore we create a topic dedicated to a single application.

Let's create the topic. In a terminal on your host machine, run the following command to create a topic and two partitions:

$ docker exec broker \
   kafka-topics --bootstrap-server broker:9092 \
             --create \
             --topic tracingpublisher --partitions 2
 Created topic tracingpublisher.
Enter fullscreen mode Exit fullscreen mode

With that out of the way, let's try running the code again (Run this command in the VSCode terminal):

$ cargo run
{
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
Enter fullscreen mode Exit fullscreen mode

Now we don't see any additional output. To verify it worked, let's use kafkacat to consume the topic's events. (We install kafkacat in the Dev Container. Please run the following command in VSCode's terminal)

$ sudo apt-get update && sudo apt-get install -y kafkacat
Enter fullscreen mode Exit fullscreen mode

(Note: kafkacat got renamed to kcat. We refer to the old name because the dev container's package sources haven't updated to the new name yet).

Let's run kafkacat in Consumer mode and listen to the
tracingpublisher topic:

$ kafkacat -C -b broker:9092 -t tracingpublisher 
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
 % Reached end of topic tracingpublisher [0] at offset 1
 % Reached end of topic tracingpublisher [1] at offset 0   
Enter fullscreen mode Exit fullscreen mode

Now we see our event. But what about the partitions? When we created the topic, we created two partitions.

Let's update our main function and re-run everything again.

use tracing::{info, warn}; //update imports
use tracing_subscriber::prelude::*;

mod custom_layer;
use custom_layer::CustomLayer;

fn main() {
    // Set up how `tracing-subscriber` will deal with tracing data.
    tracing_subscriber::registry().with(CustomLayer::new("broker:9092")).init();

    // Log something simple. In `tracing` parlance, this creates an "event".
    info!(a_bool = true, answer = 42, message = "first example");
    //add this line 
    warn!(a_bool = true, answer = 42, message = "first example"); 
}
Enter fullscreen mode Exit fullscreen mode

Let's run the program again:

$ cargo run
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Info)",
   "name": "event src/main.rs:12",
   "target": "tracing_publisher"
 }
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Warn)",
   "name": "event src/main.rs:13",
   "target": "tracing_publisher"
 }
Enter fullscreen mode Exit fullscreen mode

Now, we get two log outputs, one for info! and one for warning!. Let's run kafkacat again to verify the output:

$ kafkacat -C -b broker:9092 -t tracingpublisher 
{
  "fields": {
    "a_bool": true,
    "answer": 42,
    "message": "first example"
  },
  "level": "Level(Info)",
  "name": "event src/main.rs:12",
  "target": "tracing_publisher"
}
{
  "fields": {
    "a_bool": true,
    "answer": 42,
    "message": "first example"
  },
  "level": "Level(Info)",
  "name": "event src/main.rs:12",
  "target": "tracing_publisher"
}
% Reached end of topic tracingpublisher [0] at offset 2
{
  "fields": {
    "a_bool": true,
    "answer": 42,
    "message": "first example"
  },
  "level": "Level(Warn)",
  "name": "event src/main.rs:13",
  "target": "tracing_publisher"
}
% Reached end of topic tracingpublisher [1] at offset 1
Enter fullscreen mode Exit fullscreen mode

Without additional parameters, we consume both partitions at the same
time. But if we're only interested in, let's say warnings, we could do the following:

$ kafkacat -C -b broker:9092 -t tracingpublisher -p 1
 {
   "fields": {
     "a_bool": true,
     "answer": 42,
     "message": "first example"
   },
   "level": "Level(Warn)",
   "name": "event src/main.rs:13",
   "target": "tracing_publisher"
 }
 % Reached end of topic tracingpublisher [1] at offset 1
Enter fullscreen mode Exit fullscreen mode

Partitions don't have a name, but are identified by an index. In our case, index 1 contains all warnings.

Summary

In this tutorial, we used tracing and a custom tracing_subscriber Layer
to capture telemetry data and sent it to Apache Kafka for further processing.

Instead of sending all data into a single partition, we divide the telemetry events based on severity. A consumer can choose to selectively work with certain subsets of data, based on event severity.

Running Apache Kafka on a local machine as part of your development setup is straightforward. Deploying Kafka into a production environment, however, is a different story. We know sometimes deadlines are approaching quickly and it's challenging to deliver all features and also become a Kafka operations expert at the same time. With Calisti,
you can run Apache Kafka and Zookeeper as a Kubernetes workload without having to become an expert first. Calisti takes care of deployment and configuration, so you can focus on running your application.

Ready to deploy? Check out Calisti.app to get started with our free
tier.

Find the project's source code on GitHub.

💖 💪 🙅 🚩
schultyy
Jan Schulte

Posted on March 3, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related