Write a Tokio 0.2 decoder for a TCP stream and push its events to Apache Kafka
Michael van Niekerk
Posted on April 4, 2020
What we'll be making
We'll be listening to a port. This port is streaming out some XML events. Only caveat is is that it is first padded (sometimes) with a 32bit number to tell you how much bytes are on its way.
Now, we'll be parsing this stream of events and make it fit into our model. After converting these events into JSON we'll push it onto an Apache Kafka stream. As an added bonus these Kafka messages will be mimicking the messages generated by Spring Cloud Stream.
We'll be using futures and async code for most of the time.
Some dependencies
Over to your Cargo.toml file, add the following
[dependencies]
futures = "0.3.4"
# Bytes - we'll be using them
bytes = "0.5.4"
# Pull in the tokio framework
tokio = { version = "0.2.15", features = ["full"] }
# Using this to make the tokio codec
tokio-util = { version = "0.3.1", features = ["codec"] }
# Serde is a framework for serializing and deserializing Rust data structures efficiently and generically.
serde_derive = "1.0.105"
serde = "1.0.105"
# Derive JSON from your data
serde_json = "1.0.50"
# We'll be adding UUIDs to our Kafka messages
uuid = { version = "0.8.1", features = ["v4"] }
# Rust bindings for Apache Kafka
rdkafka = { version = "0.23", features = ["cmake-build"] }
A quick cargo build
will show you how bad everything is going to blow up in your face. The rdkafka crate is a beast. It compiles the C client library statically (or dynamically), links to it and its dependencies statically. That is, of course, if you've got everything installed. Best head over to https://github.com/fede1024/rust-rdkafka to check what you'll need.
I've used this crate for almost over a year, and I must say the build system isn't as near as brittle as it was a year back. Its build system is actually a work of (Makefile and other build scripts) art.
On with the decoder
But first the domain model
Our port we'll be listening to is sending out a report every 1 second, if it doesn't get an event. When it gets an event it'll send the packet out earlier.
In plain text (if you would do a nc 192.168.1.2 10000
to listen to this port) the packets can look like this:
šš<?xml version="1.0" encoding="UTF-8"?>
<ReceiverReport>
<NetAddress value="192.168.1.2:10000"/>
</ReceiverReport>
šš<?xml version="1.0" encoding="UTF-8"?>
<ReceiverReport>
<NetAddress value="192.168.1.2:10000"/>
<Transmission>
<Message value="000000010011011001001000000100" units="bits"/>
</Transmission>
</ReceiverReport>
...
Note the šš - they are the padding (that is also optional) that tells us how big the packet is after the šš.
So let's make a model out of the above:
extern crate serde_xml_rs;
use serde_xml_rs::from_reader;
#[derive(Deserialize)] // Deserialize using serde
pub struct NetAddress {
pub value: String
}
#[derive(Deserialize)]
pub struct Message {
pub units: String, // (2)
pub value: String
}
#[derive(Deserialize)]
pub struct Transmission {
#[serde(rename = "Message", default)]
pub messages: Vec<Message> // (1)
}
#[derive(Deserialize)]
pub struct ReceiverReport {
#[serde(rename = "Transmission", default)]
pub transmissions: Vec<Transmission>,
#[serde(rename = "NetAddress", default)]
pub net_addresses: Vec<NetAddress>
}
impl std::str::FromStr for ReceiverReport { // (3)
type Err = serde_xml_rs::Error;
fn from_str(xml: &str) -> Result<Self, Self::Err> {
from_reader(xml.as_bytes())
}
}
(1). Remember in XML you can have more than 1 of the same type of element as a child node, hence these are vectors. Here we also see how to rename a field in the XML to an element in the deserialized struct.
(2). Note that these are properties of the XML node and thus aren't vectors
(3). Parse an XML string and convert it to a ReceiverReport struct
Now for the actual decoder
The Decoder trait comes from the tokio-util crate.
use tokio_util::codec::Decoder;
use bytes::{BufMut, BytesMut};
pub struct MyCodec;
impl Decoder for MyCodec {
type Item = Vec<Message>; // (1)
type Error = std::io::Error; // (2)
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> { // (3)
// <- Insert magic here ->
}
}
(1). Item is the return type. We'll be returning a vector of Message structs
(2). We can return an io::Error
(3). The function of the Decoder trait you need to implement.
So lets discuss the Decoder trait. We are being fed a buffer of bytes.
Scenario 1: The whole buffer constitutes exactly one ReceiverReport.
Scenario 2: The first part of the buffer constitutes one ReceiverReport.
Scenario 3: We don't have enough bytes to have something that can be one ReceiverReport.
Scenario 4: We're off our trolley and none of the data makes any sense.
Scenario 3's return is an easy Ok(None)
return value.
Scenario 1 and 2 is exactly the same in how it is handled. We'll be getting the piece of the buffer that's the XML and forward the buffer to after the point we've read (after "</ReceiverReport>\r\n"
). Then we'll parse the XML, fish out the Transmission vector and return that to tokio using an Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&xml).unwrap())))
.
Scenario 4 would return a io::Err().
Adding codec magic
Below is code to pull out a data frame out the buffer.
fn messages_from_transmissions(rp: ReceiverReport) -> Vec<Message> {
if !rp.transmissions.is_empty() {
rp.transmissions.iter()
.filter(|tx| !tx.messages.is_empty())
.filter(|tx| tx.messages.get(0).unwrap().units.eq("bits"))
.map(|tx| tx.messages.get(0).unwrap())
.collect()
} else {
vec![]
}
}
pub struct MyCodec;
impl Decoder for MyCodec {
type Item = Vec<Message>;
type Error = std::io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
if buf.len() < 4 {
// We don't even have enough data to get the length. Scenario 3
return Ok(None);
};
// Read the length of the packet
let mut len = 0;
for i in 0..4 {
len <<= 8;
len += buf[3-i] as usize;
}
if len == 0x6D7_83F3C { // We are starting with <xml and not the padding
let end_tx = "</ReceiverReport>\r\n".to_string();
let end_tx_b: &[u8] = end_tx.as_ref();
let end_tx_len = end_tx.len();
if buf.len() < end_tx_len {
return Ok(None)
}
// We don't have a frame length to work with
// Search through the buffer and try to find the end of the ReceiverReport
let pos = buf.windows(end_tx_len).position(|window| window == end_tx_b);
match pos {
// We've found the end of it. Scenario 1 or 2
Some(p) => {
// Forward the buffer onwards and copy that which you've forwarded past from
let b = buf.split_to(p + end_tx_len);
// Get the XML string
let xml = String::from_utf8(b.to_vec()).expect("invalid utf8 data");
Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&xml).unwrap())))
},
// Scenario 3
None => Ok(None)
}
} else {
let len = len as usize;
if len == 0 {
return Ok(None)
}
let buf_len = buf.len();
// We still need some bytes
if buf_len < (len + 4) {
// Scenario 3
Ok(None)
} else {
// Forward the buffer
let b = buf.split_to(len + 4);
let xml = &b[4..];
let xml = String::from_utf8(xml.to_vec()).expect("invalid utf8 data");
// Scenario 1 or 2
Ok(Some(messages_from_transmissions(ReceiverReport::from_str(&xml).unwrap())))
}
}
}
A framed TCP stream
So we've got our swanky new decoder. Now we want to connect to a socket and get notified one by one as these messages are sent.
First we need an encoder - because the Framed trait says so (even though we're not going to use the encoder). This one below gets a vector of bytes and stuffs a buffer with them:
use tokio_util::codec::Encoder;
impl Encoder<Vec<u8>> for MyCodec {
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
buf.put(&data[..]);
Ok(())
}
}
Then we create a Framed TcpStream:
pub type MyCodecConnection = Framed<TcpStream, MyCodec>; // (1)
impl MyCodec {
pub async fn connect(addr: &SocketAddr) -> Result<MyCodecConnection, io::Error> {
let tcp_stream = TcpStream::connect(addr).await?;
Ok(MyCodec.framed(tcp_stream)) // (2)
}
}
(1). MyCodecConnection type
(2). The .framed function comes from the implemented Decoder trait.
Sending something to Kafka
Base
use rdkafka::producer::FutureProducer;
pub struct Sender {
producer: FutureProducer
}
Connect
use rdkafka::config::ClientConfig;
use rdkafka::error::KafkaResult;
use rdkafka::producer::FutureProducer;
impl Sender {
pub fn create_and_connect() -> Result<Sender, ()> {
let brokers = "kafkabroker1.kaas.com,kafkabroker2.kaas.com";
let producer: KafkaResult<FutureProducer> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create();
match producer {
Ok(producer) => Ok(Sender { producer }),
Err(_) => Err(())
}
}
}
Above sets up a connection to 2 Kafka brokers and makes up the producer (that'll be used to send messages later on).
Send
Given a topic, a key and a message, send it to a list of Kafka brokers. But also try to mimic Spring Cloud Stream's packets.
impl Sender {
use rdkafka::producer::FutureRecord;
use uuid::Uuid;
use rdkafka::producer::DeliveryFuture;
use rdkafka::message::OwnedHeaders;
use std::time::{SystemTime, UNIX_EPOCH};
pub fn send(&self, topic: &str, key: &str, message: &str) -> DeliveryFuture {
// Get the timestamp
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let in_ms = since_the_epoch.as_secs() * 1000 +
since_the_epoch.subsec_nanos() as u64 / 1_000_000;
let ts = format!("{}", in_ms);
let ts: &str = ts.as_ref();
// Generate a UUID
let my_uuid = Uuid::new_v4();
let my_uuid = format!("{}", my_uuid);
let my_uuid: &str = my_uuid.as_ref();
// Content type - some Spring Cloud Stream extras
let content_type: serde_json::Value = json!({
"type":"application",
"subtype":"json",
"parameters":{},
"concrete":true,
"wildCardType":false,
"wildCardSubtype":false,
});
let content_type = content_type.to_string();
let content_type: &str = content_type.as_ref();
// More Spring Cloud Stream copying
let spring_json_header_types = json!({
"contentType": "org.springframework.util.MimeType"
});
let spring_json_header_types = spring_json_header_types.to_string();
let spring_json_header_types: &str = spring_json_header_types.as_ref();
// Actually make the record that we're sending
let record: FutureRecord<str, str> =
FutureRecord::to(topic)
.payload(message)
.key(key)
.headers(OwnedHeaders::new()
.add("content-type", "application/json")
.add("timestamp", ts)
.add("id", my_uuid)
.add("contentType", content_type)
.add("spring_json_header_types", spring_json_header_types)
);
self.producer.send(record, 0)
}
}
Enter Tokio
Now to glue everything up together.
Below is some very hairy stuff. In short, we'll be listening for these messages that come from our TcpStream. When we do get a list of messages, we set up an MPSC (multiple producer single consumer) channel - the receiving end of the channel sends the json produced by the producer to a Kafka stream.
The resulting futures of the sending of the Kafka messages is then mapped to be either a boolean true or false depending on their delivery state.
In between all of this, these small tasks are spawned using the Tokio runtime.
pub fn connect_to_xml_stream() {
use futures::future;
use tokio::prelude::*;
use tokio::runtime::Runtime;
use std::net::SocketAddr;
use futures::channel::mpsc::channel;
// Setup a Tokio runtime
let mut rt = Runtime::new().unwrap();
loop {
let result = rt.block_on(async {
// Convert a string to a socket address
let addr = "192.168.2.100".parse::<SocketAddr>().unwrap();
// Connect to the TCP stream
let streams = MyCodec::connect(&addr).await;
if streams.is_err() {
let err = streams.err().unwrap();
println!("Error with opening up connection {:?}, waiting 10s", err);
// Wait for 10 seconds before retrying the loop
tokio::time::delay_for(Duration::from_secs(10)).await;
return Ok(false);
}
let (_sink, ins) = streams.unwrap().split();
println!("Connected");
// Map the streamed messages to futures
let fut = ins.map(move |chunk: Result<Vec<Message>, io::Error>| {
match chunk {
Ok(s) => {
match s.len() {
// Only send to Kafka if we have at least 1 message
x if x > 0 => {
let sender = Sender::create_and_connect();
let sender = sender.unwrap();
// Open up a channel for the sender
let (tx, rx) = channel::<String>(x);
// Iterate over the list of events
let s = s.iter().map(|e| {
let tx: futures::channel::mpsc::Sender<String> = tx.clone();
(e.clone(), tx)
}).map(|(event, mut tx)| async move {
// Send the event's json to the channel
let json = serde_json::to_string(&event).unwrap();
let t = tx.try_send(json);
t.is_ok()
});
let f = future::join_all(s);
tokio::task::spawn(f);
tokio::task::spawn(async move {
// On the receiving end of the channel, send the received json to the Kafka sender
let l: bool = rx.map(move |json| {
sender.send("kafka_topic", "kafka_key", &json[..])
.map(|df| {
match df {
Ok(_) => true,
_ => false
}
})
})
.fold(true, |acc, x| async move { acc && x.await })
.await;
l
});
future::ready(true)
},
_ => {
future::ready(false)
}
}
},
Err(_e) => {
future::ready(false)
}
}
})
.then(|v| {
// The mapped futures should be spawned
tokio::task::spawn(v)
}).fold(true, |acc, x| async move { acc && if let Ok(s) = x { s } else { false } });
// Await until the connection is closed
tokio::task::spawn(fut).await
});
match result {
Ok(_) => println!("Finished with connection"),
_ => println!("Exited connection")
}
}
}
Conclusion
I've written this in hope that it helps someone. Please send me a message if this was you, or buy me a beer
Photo by
Torsten Muller
Posted on April 4, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.