Write a Tokio 0.2 decoder for a TCP stream and push its events to Apache Kafka

mvniekerk

Michael van Niekerk

Posted on April 4, 2020

Write a Tokio 0.2 decoder for a TCP stream and push its events to Apache Kafka

What we'll be making

We'll be listening to a port. This port is streaming out some XML events. Only caveat is is that it is first padded (sometimes) with a 32bit number to tell you how much bytes are on its way.

Now, we'll be parsing this stream of events and make it fit into our model. After converting these events into JSON we'll push it onto an Apache Kafka stream. As an added bonus these Kafka messages will be mimicking the messages generated by Spring Cloud Stream.

We'll be using futures and async code for most of the time.

Some dependencies

Over to your Cargo.toml file, add the following

[dependencies]
futures = "0.3.4"
# Bytes - we'll be using them
bytes = "0.5.4"
# Pull in the tokio framework
tokio = { version = "0.2.15", features = ["full"] }
# Using this to make the tokio codec
tokio-util = { version = "0.3.1", features = ["codec"] }
# Serde is a framework for serializing and deserializing Rust data structures efficiently and generically.
serde_derive = "1.0.105"
serde = "1.0.105"
# Derive JSON from your data 
serde_json = "1.0.50"
# We'll be adding UUIDs to our Kafka messages
uuid = { version = "0.8.1", features = ["v4"] }
# Rust bindings for Apache Kafka
rdkafka = { version = "0.23", features = ["cmake-build"] }

Enter fullscreen mode Exit fullscreen mode

A quick cargo build will show you how bad everything is going to blow up in your face. The rdkafka crate is a beast. It compiles the C client library statically (or dynamically), links to it and its dependencies statically. That is, of course, if you've got everything installed. Best head over to https://github.com/fede1024/rust-rdkafka to check what you'll need.
I've used this crate for almost over a year, and I must say the build system isn't as near as brittle as it was a year back. Its build system is actually a work of (Makefile and other build scripts) art.

On with the decoder

But first the domain model

Our port we'll be listening to is sending out a report every 1 second, if it doesn't get an event. When it gets an event it'll send the packet out earlier.

In plain text (if you would do a nc 192.168.1.2 10000 to listen to this port) the packets can look like this:

 šŸŽƒšŸŽƒ<?xml version="1.0" encoding="UTF-8"?>
<ReceiverReport>
    <NetAddress value="192.168.1.2:10000"/>
</ReceiverReport>
šŸŽƒšŸŽƒ<?xml version="1.0" encoding="UTF-8"?>
<ReceiverReport>
    <NetAddress value="192.168.1.2:10000"/>
    <Transmission>
        <Message value="000000010011011001001000000100" units="bits"/>
    </Transmission>
</ReceiverReport>
...
Enter fullscreen mode Exit fullscreen mode

Note the šŸŽƒšŸŽƒ - they are the padding (that is also optional) that tells us how big the packet is after the šŸŽƒšŸŽƒ.

So let's make a model out of the above:

extern crate serde_xml_rs;
use serde_xml_rs::from_reader;

#[derive(Deserialize)] // Deserialize using serde
pub struct NetAddress {
  pub value: String
}

#[derive(Deserialize)]
pub struct Message {
  pub units: String, // (2)
  pub value: String
}

#[derive(Deserialize)]
pub struct Transmission {
  #[serde(rename = "Message", default)]
  pub messages: Vec<Message> // (1)
}

#[derive(Deserialize)]
pub struct ReceiverReport {
  #[serde(rename = "Transmission", default)]
  pub transmissions: Vec<Transmission>,

  #[serde(rename = "NetAddress", default)]
  pub net_addresses: Vec<NetAddress>
}

impl std::str::FromStr for ReceiverReport { // (3)
  type Err = serde_xml_rs::Error;

  fn from_str(xml: &str) -> Result<Self, Self::Err> {
    from_reader(xml.as_bytes())
  }
}
Enter fullscreen mode Exit fullscreen mode

(1). Remember in XML you can have more than 1 of the same type of element as a child node, hence these are vectors. Here we also see how to rename a field in the XML to an element in the deserialized struct.
(2). Note that these are properties of the XML node and thus aren't vectors
(3). Parse an XML string and convert it to a ReceiverReport struct

Now for the actual decoder

The Decoder trait comes from the tokio-util crate.

use tokio_util::codec::Decoder;
use bytes::{BufMut, BytesMut};

pub struct MyCodec;
impl Decoder for MyCodec {
    type Item = Vec<Message>;  // (1)
    type Error = std::io::Error; // (2)
    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> { // (3)
      // <- Insert magic here ->
    }
}
Enter fullscreen mode Exit fullscreen mode

(1). Item is the return type. We'll be returning a vector of Message structs
(2). We can return an io::Error
(3). The function of the Decoder trait you need to implement.

So lets discuss the Decoder trait. We are being fed a buffer of bytes.
Scenario 1: The whole buffer constitutes exactly one ReceiverReport.
Scenario 2: The first part of the buffer constitutes one ReceiverReport.
Scenario 3: We don't have enough bytes to have something that can be one ReceiverReport.
Scenario 4: We're off our trolley and none of the data makes any sense.

Scenario 3's return is an easy Ok(None) return value.
Scenario 1 and 2 is exactly the same in how it is handled. We'll be getting the piece of the buffer that's the XML and forward the buffer to after the point we've read (after "</ReceiverReport>\r\n"). Then we'll parse the XML, fish out the Transmission vector and return that to tokio using an Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&xml).unwrap()))).
Scenario 4 would return a io::Err().

Adding codec magic

Below is code to pull out a data frame out the buffer.


fn messages_from_transmissions(rp: ReceiverReport) -> Vec<Message> {
    if !rp.transmissions.is_empty() {
        rp.transmissions.iter()
            .filter(|tx| !tx.messages.is_empty())
            .filter(|tx| tx.messages.get(0).unwrap().units.eq("bits"))
            .map(|tx| tx.messages.get(0).unwrap())
            .collect()
    } else {
        vec![]
    }
}

pub struct MyCodec;

impl Decoder for MyCodec {
  type Item = Vec<Message>;
  type Error = std::io::Error; 
  fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
      if buf.len() < 4 {
          // We don't even have enough data to get the length. Scenario 3
          return Ok(None);
      };

      // Read the length of the packet
      let mut len = 0;
      for i in 0..4 {
          len <<= 8;
          len += buf[3-i] as usize;
      }

      if len == 0x6D7_83F3C { // We are starting with <xml and not the padding
          let end_tx = "</ReceiverReport>\r\n".to_string();
          let end_tx_b: &[u8] = end_tx.as_ref();
          let end_tx_len = end_tx.len();

          if buf.len() < end_tx_len {
              return Ok(None)
          }

          // We don't have a frame length to work with
          // Search through the buffer and try to find the end of the ReceiverReport
          let pos = buf.windows(end_tx_len).position(|window| window == end_tx_b);
          match pos {
              // We've found the end of it. Scenario 1 or 2
              Some(p) => {
                  // Forward the buffer onwards and copy that which you've forwarded past from
                  let b = buf.split_to(p + end_tx_len);
                  // Get the XML string
                  let xml = String::from_utf8(b.to_vec()).expect("invalid utf8 data");
                  Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&xml).unwrap())))
              },
              // Scenario 3
              None => Ok(None)
          }
      } else {
          let len = len as usize;

          if len == 0 {
              return Ok(None)
          }

          let buf_len = buf.len();

          // We still need some bytes
          if buf_len < (len + 4) {
                // Scenario 3
                Ok(None)
          } else {
              // Forward the buffer
              let b = buf.split_to(len + 4);
              let xml = &b[4..];
              let xml = String::from_utf8(xml.to_vec()).expect("invalid utf8 data");
              // Scenario 1 or 2
              Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&xml).unwrap())))
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

A framed TCP stream

So we've got our swanky new decoder. Now we want to connect to a socket and get notified one by one as these messages are sent.

First we need an encoder - because the Framed trait says so (even though we're not going to use the encoder). This one below gets a vector of bytes and stuffs a buffer with them:

use tokio_util::codec::Encoder;

impl Encoder<Vec<u8>> for MyCodec {
    type Error = io::Error;

    fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
        buf.put(&data[..]);
        Ok(())
    }
}
Enter fullscreen mode Exit fullscreen mode

Then we create a Framed TcpStream:

pub type MyCodecConnection = Framed<TcpStream, MyCodec>; // (1)

impl MyCodec {
    pub async fn connect(addr: &SocketAddr) -> Result<MyCodecConnection, io::Error> {
        let tcp_stream = TcpStream::connect(addr).await?;
        Ok(MyCodec.framed(tcp_stream)) // (2)
    }
}
Enter fullscreen mode Exit fullscreen mode

(1). MyCodecConnection type
(2). The .framed function comes from the implemented Decoder trait.

Sending something to Kafka

Base


use rdkafka::producer::FutureProducer;

pub struct Sender {
    producer: FutureProducer
}
Enter fullscreen mode Exit fullscreen mode

Connect

use rdkafka::config::ClientConfig;
use rdkafka::error::KafkaResult;
use rdkafka::producer::FutureProducer;

impl Sender {
    pub fn create_and_connect() -> Result<Sender, ()> {
        let brokers = "kafkabroker1.kaas.com,kafkabroker2.kaas.com";
        let producer: KafkaResult<FutureProducer> = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("message.timeout.ms", "5000")
            .create();

        match producer {
            Ok(producer) => Ok(Sender { producer }),
            Err(_) => Err(())
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Above sets up a connection to 2 Kafka brokers and makes up the producer (that'll be used to send messages later on).

Send

Given a topic, a key and a message, send it to a list of Kafka brokers. But also try to mimic Spring Cloud Stream's packets.

impl Sender {
    use rdkafka::producer::FutureRecord;
    use uuid::Uuid;
    use rdkafka::producer::DeliveryFuture;
    use rdkafka::message::OwnedHeaders;
    use std::time::{SystemTime, UNIX_EPOCH};

    pub fn send(&self, topic: &str, key: &str, message: &str) -> DeliveryFuture {
        // Get the timestamp
        let start = SystemTime::now();
        let since_the_epoch = start.duration_since(UNIX_EPOCH)
            .expect("Time went backwards");
        let in_ms = since_the_epoch.as_secs() * 1000 +
            since_the_epoch.subsec_nanos() as u64 / 1_000_000;
        let ts = format!("{}", in_ms);
        let ts: &str = ts.as_ref();

        // Generate a UUID
        let my_uuid = Uuid::new_v4();
        let my_uuid = format!("{}", my_uuid);
        let my_uuid: &str = my_uuid.as_ref();

        // Content type - some Spring Cloud Stream extras
        let content_type: serde_json::Value = json!({
            "type":"application",
            "subtype":"json",
            "parameters":{},
            "concrete":true,
            "wildCardType":false,
            "wildCardSubtype":false,
        });

        let content_type = content_type.to_string();
        let content_type: &str = content_type.as_ref();

        // More Spring Cloud Stream copying
        let spring_json_header_types = json!({
            "contentType": "org.springframework.util.MimeType"
        });
        let spring_json_header_types = spring_json_header_types.to_string();
        let spring_json_header_types: &str = spring_json_header_types.as_ref();

        // Actually make the record that we're sending
        let record: FutureRecord<str, str> =
            FutureRecord::to(topic)
                .payload(message)
                .key(key)
                .headers(OwnedHeaders::new()
                    .add("content-type", "application/json")
                    .add("timestamp", ts)
                    .add("id", my_uuid)
                    .add("contentType", content_type)
                    .add("spring_json_header_types", spring_json_header_types)
                );
        self.producer.send(record, 0)
    }
}
Enter fullscreen mode Exit fullscreen mode

Enter Tokio

Now to glue everything up together.
Below is some very hairy stuff. In short, we'll be listening for these messages that come from our TcpStream. When we do get a list of messages, we set up an MPSC (multiple producer single consumer) channel - the receiving end of the channel sends the json produced by the producer to a Kafka stream.
The resulting futures of the sending of the Kafka messages is then mapped to be either a boolean true or false depending on their delivery state.
In between all of this, these small tasks are spawned using the Tokio runtime.

pub fn connect_to_xml_stream() {
    use futures::future;
    use tokio::prelude::*;
    use tokio::runtime::Runtime;
    use std::net::SocketAddr;
    use futures::channel::mpsc::channel;

    // Setup a Tokio runtime
    let mut rt = Runtime::new().unwrap();
    loop {
        let result = rt.block_on(async {
            // Convert a string to a socket address
            let addr = "192.168.2.100".parse::<SocketAddr>().unwrap();
            // Connect to the TCP stream
            let streams = MyCodec::connect(&addr).await;

            if streams.is_err() {
                let err = streams.err().unwrap();
                println!("Error with opening up connection {:?}, waiting 10s", err);
                // Wait for 10 seconds before retrying the loop
                tokio::time::delay_for(Duration::from_secs(10)).await;
                return Ok(false);
            }
            let (_sink, ins) = streams.unwrap().split();
            println!("Connected");

            // Map the streamed messages to futures
            let fut = ins.map(move |chunk: Result<Vec<Message>, io::Error>| {
                match chunk {
                    Ok(s) => {
                        match s.len() {
                            // Only send to Kafka if we have at least 1 message
                            x if x > 0 => {
                                let sender = Sender::create_and_connect();
                                let sender = sender.unwrap();
                                // Open up a channel for the sender
                                let (tx, rx) = channel::<String>(x);
                                // Iterate over the list of events
                                let s = s.iter().map(|e| {
                                    let tx: futures::channel::mpsc::Sender<String> = tx.clone();

                                    (e.clone(), tx)
                                }).map(|(event, mut tx)| async move {
                                    // Send the event's json to the channel
                                    let json = serde_json::to_string(&event).unwrap();
                                    let t = tx.try_send(json);
                                    t.is_ok()
                                });

                                let f = future::join_all(s);
                                tokio::task::spawn(f);

                                tokio::task::spawn(async move {
                                    // On the receiving end of the channel, send the received json to the Kafka sender
                                    let l: bool = rx.map(move |json| {
                                        sender.send("kafka_topic", "kafka_key", &json[..])
                                            .map(|df| {
                                                match df {
                                                    Ok(_) => true,
                                                    _ => false
                                                }
                                            })
                                    })
                                        .fold(true, |acc, x| async move { acc && x.await })
                                        .await;
                                    l
                                });
                                future::ready(true)
                            },
                            _ => {
                                future::ready(false)
                            }
                        }
                    },
                    Err(_e) => {
                        future::ready(false)
                    }
                }
            })
            .then(|v| {
                // The mapped futures should be spawned
                tokio::task::spawn(v)
            }).fold(true, |acc, x| async move { acc && if let Ok(s) = x { s } else { false } });
            // Await until the connection is closed
            tokio::task::spawn(fut).await
        });
        match result {
            Ok(_) => println!("Finished with connection"),
            _ => println!("Exited connection")
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

I've written this in hope that it helps someone. Please send me a message if this was you, or buy me a beer

Donate using Liberapay

Photo by
Torsten Muller

šŸ’– šŸ’Ŗ šŸ™… šŸš©
mvniekerk
Michael van Niekerk

Posted on April 4, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related