Cap'n Proto - RPC at the speed of Rust - Part 2 of 2

kushalj

Kushal Joshi

Posted on June 12, 2022

Cap'n Proto - RPC at the speed of Rust - Part 2 of 2

This is Part 2 of a two part article on Cap'n Proto (capnp). I'd strongly suggest reading Part 1 so Part 2 makes sense.

The code is available here:

https://github.com/kushalj/capnproto-demo

Title photo by Anders Jildén on Unsplash


As a reminder, this is our example capnp schema:

@0xb068ff5fb1c4f77e;

using Rust = import "rust.capnp";
$Rust.parentModule("server");

struct Point {
    x @0 :Float32;
    y @1 :Float32;
}

interface PointTracker {
    addPoint @0 (p :Point) -> (totalPoints :UInt64);
}
Enter fullscreen mode Exit fullscreen mode

Setting up the client and server

The objective here it so use the Cap'n Proto RPC features and not explore setting up a server so the Tokio code below is copied from the capnp-rpc examples and modified slightly towards simplification so we can explore the capnp-rpc features and see them in the context of the Tokio code.

Starting with the code at the end of Part 1, we have this as our current server.rs:

#![allow(dead_code)]

#[path = "./schema/point_capnp.rs"]
mod point_capnp;

pub mod point_demo {
    use crate::server::point_capnp::point;
    use capnp::serialize;
    use std::fs::File;

    pub fn write_to_stream() -> std::io::Result<()> {
        let mut message = ::capnp::message::Builder::new_default();

        let mut demo_point = message.init_root::<point::Builder>();

        demo_point.set_x(5_f32);
        demo_point.set_y(10_f32);

        // This Result should be consumed properly in an actual app
        let _ = serialize::write_message(&mut ::std::io::stdout(), &message);

        // Save the point
        {
            let file = File::create("point.txt")?;
            let _ = serialize::write_message(file, &message);
        }

        // Read the point from file
        {
            let point_file = File::open("point.txt")?;

            // We want this to panic in our demo incase there is an issue
            let point_reader =
                serialize::read_message(point_file, ::capnp::message::ReaderOptions::new())
                    .unwrap();

            let demo_point: point::Reader = point_reader.get_root().unwrap();
            println!("\n(x = {}, y = {})", demo_point.get_x(), demo_point.get_y());
        }

        Ok(())
    }
}
Enter fullscreen mode Exit fullscreen mode

The code is suppressing Results with unwraps and is dirtier than a cloth seat on UK public transport but that's ok - we are just using this to gain intuition. When the time comes to use capnp in production, you will not code like this. Please don't code like this.

Moving on, Cap'n Proto (capnp) facilitates RPC via a tiny built-in concurrency event-loop, very similar to JavaScripts's called KJ. An odd discovery for me, as these are my initials - but the docs say they don't stand for anything and were chosen as they were easy to type. 😕 Why not give it some kind of useful name? Names are strongly related to learning and knowledge transition. This mildly irritating - but not really that important as, fortunately, the Rust version of capnp has the sensible name RPCSystem which makes the code a little more readable.

Within the capnproto-rust repo, there is a capnp-rpc folder with some example of how to use the RPC system. The Docs refer to something called EzyRPCServer and this no longer exists in the code but there are some examples of Tokio based async servers, which is a good thing as that is how we should build this anyway (with Tokio), rather than abstracting away some magical server code.

We can start by changing main.rs to be a simplified version of the capnp-rpc hello-world example:

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() >= 2 {
        match &args[1][..] {
            "client" => return client::main().await,
            "server" => return server::main().await,
            _ => ()
        }
    }

    println!("usage: {} [client | server] ADDRESS", args[0]);
    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

So now we need to adjust Cargo.toml:

...

[dependencies]
capnp = "0.14"
capnp-rpc = "0.14"
tokio = { version = "1.0.0", features = ["net", "rt", "macros"]}
tokio-util = { version = "0.6.0", features = ["compat"] }
futures = "0.3.0"

...
Enter fullscreen mode Exit fullscreen mode

...and add a skeleton main() to server.rs:

use std::net::ToSocketAddrs;

...

pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() != 3 {
        println!("usage: {} server ADDRESS:PORT", args[0]);
        return Ok(());
    }

    let addr = args[2]
        .to_socket_addrs()
        .unwrap()
        .next()
        .expect("could not parse address");

    tokio::task::LocalSet::new()
        .run_until(async move {
            let _listener = tokio::net::TcpListener::bind(&addr).await?;

            println!("Server running");
            loop {}
        })
        .await
}
Enter fullscreen mode Exit fullscreen mode

(_listener has an underscore to remove warnings. We will remove this underscore shortly)

...and finally we need to add skeleton a client.rs:

#![allow(dead_code)]

use futures::AsyncReadExt;
use std::net::ToSocketAddrs;

#[path = "./schema/point_capnp.rs"]
pub mod point_capnp;

pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() != 3 {
        println!("usage: {} client HOST:PORT", args[0]);
        return Ok(());
    }

    let addr = args[2]
        .to_socket_addrs()
        .unwrap()
        .next()
        .expect("could not parse address");

    tokio::task::LocalSet::new()
        .run_until(async move {
            let stream = tokio::net::TcpStream::connect(&addr).await?;

            println!("Connected to TCP Stream");

            stream.set_nodelay(true)?;
            let (_reader, _writer) =
                tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();

            // RPC code

            Ok(())
        })
        .await
}
Enter fullscreen mode Exit fullscreen mode

(_reader and _writer have underscores to remove warnings. We will remove these underscores shortly)

These are the basics (from the capnp example code) to create a network connection and pass a message (I've excluded capnp code so we can build that up gradually and you can get an idea of what is boiler-plate and what is specific to our capnp client/server example).

Let's test the client:

❯ cargo run client 0.0.0.0:8080
   Compiling capnproto-demo v0.1.0 (/Users/kushaljoshi/code/rust/capnproto/capnproto-demo)
    Finished dev [unoptimized + debuginfo] target(s) in 1.46s
     Running `target/debug/capnproto-demo client '0.0.0.0:8080'`
Error: Os { code: 61, kind: ConnectionRefused, message: "Connection refused" }
Enter fullscreen mode Exit fullscreen mode

That Connection refused error makes sense as the Server is not running - and functions as an adequate failing test to let us know the client tried to connect. We can open another terminal and run the server from the same folder:

❯ cargo run server 0.0.0.0:8080
    Finished dev [unoptimized + debuginfo] target(s) in 0.09s
     Running `target/debug/capnproto-demo server '0.0.0.0:8080'`
Server running

Enter fullscreen mode Exit fullscreen mode

...and now the client again:

❯ cargo run client 0.0.0.0:8080
    Finished dev [unoptimized + debuginfo] target(s) in 0.09s
     Running `target/debug/capnproto-demo client '0.0.0.0:8080'`
Connected to TCP Stream
Enter fullscreen mode Exit fullscreen mode

That's better. This seems to indicate that the client is connecting to the server (there is no error on connection), so we can assume we have a basic server and a client that can connect to it.

Magical flying clients

At this point the capnp docs describe how to use the RPC framework in C++ only at this time, so I had to dive into the code sample and breakdown the intention of the code and rummage around to see what is happening just under the surface. I tried to refer back to the C++ docs but there are numerous differences in usage and implementation.

The capnp-rpc README has some information that helps and contains the following magical description: ""

Then you can convert your object into a capability client like this:

(capnp example)

let client: foo_capnp::bar::Client = capnp_rpc::new_client(MyBar {});
Enter fullscreen mode Exit fullscreen mode

This new client can now be sent across the network. You can use it as the bootstrap capability when you construct an RpcSystem, and you can pass it in RPC method arguments and results.

Say what now?

So my server makes a client and sends the client to the... client?

Ok sure, I think I follow. Let's make the server-client thing then:

The generated code also includes a Server trait for each of your interfaces. To create an RPC-enabled object, you must implement that trait.

So for this schema (capnp example):

@0xa7ed6c5c8a98ca40;

interface Bar {
    baz @0 (x :Int32) -> (y :Int32);
}

interface Qux {
    quux @0 (bar :Bar) -> (y :Int32);
}
Enter fullscreen mode Exit fullscreen mode

I need to build this (capnp example):

struct MyBar {}

impl ::foo_capnp::bar::Server for MyBar {
     fn baz(&mut self,
            params: ::foo_capnp::bar::BazParams,
            mut results: ::foo_capnp::bar::BazResults)
        -> Promise<(), ::capnp::Error>
     {
         // `pry!` is defined in capnp_rpc. It's analogous to `try!`.
         results.get().set_y(pry!(params.get()).get_x() + 1);

         Promise::ok(())
     }
}
Enter fullscreen mode Exit fullscreen mode

That looks ok - the generated code seems to have types for the params and results of the baz interface method already. Let's start with that for our PointTracker interface. We will call it PointTrackerImpl rather than MyPoint (which is also the convention in the rest of the examples):

pub struct Point {
    x: f32,
    y: f32,
}

struct PointTrackerImpl {
    points: Vec<Point>,
}

impl point_tracker::Server for PointTrackerImpl {
    fn add_point(
        &mut self,
        params: point_tracker::AddPointParams,
        mut results: point_tracker::AddPointResults,
    ) -> Promise<(), ::capnp::Error> {
        let point_client = pry!(params.get()).get_p();

        if let Ok(received_point) = point_client {
            self.points.push(Point {
                x: received_point.get_x(),
                y: received_point.get_y(),
            });
        }
        results.get().set_total_points(self.points.len() as u64);

        Promise::ok(())
    }
}
Enter fullscreen mode Exit fullscreen mode

(This code is in server.rs)

and now we can point at this point-tracker from our server code, using the boiler-plate from the hello-world example:

use capnp::capability::Promise;
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};

use futures::{AsyncReadExt, FutureExt};
use std::net::ToSocketAddrs;


// existing code...

...


pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() != 3 {
        println!("usage: {} server ADDRESS:PORT", args[0]);
        return Ok(());
    }

    let addr = args[2]
        .to_socket_addrs()
        .unwrap()
        .next()
        .expect("could not parse address");

    tokio::task::LocalSet::new()
        .run_until(async move {
            let listener = tokio::net::TcpListener::bind(&addr).await?;

            // Cap'n Proto point_tracker client initialised here
            let point_tracker_client: point_tracker::Client =
                capnp_rpc::new_client(PointTrackerImpl { points: Vec::new() });

            println!("Server running");
            loop {
                let (stream, _) = listener.accept().await?;
                stream.set_nodelay(true)?;
                let (reader, writer) =
                    tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();

                let network = twoparty::VatNetwork::new(
                    reader,
                    writer,
                    rpc_twoparty_capnp::Side::Server,
                    Default::default(),
                );

                let rpc_system =
                    RpcSystem::new(Box::new(network), Some(point_tracker_client.clone().client));

                tokio::task::spawn_local(Box::pin(rpc_system.map(|_| ())));
            }
        })
        .await
}
Enter fullscreen mode Exit fullscreen mode

(This code is in server.rs)

And our server is nearly done!

Image description

Vat?

If that was a little bit like drawing an owl, I understand - It's ok, we're going to break that last bunch of code down a bit and explain what the Cap'n is doing.

The Promise we are using above is part of the RPC system and facilitates the async functionality within the RPC runtime. The actual response, as you will be able to figure out eventually after some head-scratching, is the results.get() function and the other code it contains.

So in the add_point() point_tracker::Server trait implementation of PointerTrackerImpl we are doing the following:

  • Get the point-client from the request which should give us a Result
  • If the Result is ok, grab the x & y coordinates for the point directly from the message
  • Save the total number of points as a new message
  • Send the message as the response

This part is a little fiddly and I had to fart around with the available helper code and examples to shape this. I'm not 100% happy with it but it will do as an example for our purposes here.

So this is our "Server" but it needs to be Served somehow. In the main() function of server.rs we set up something a little odd now. A two-party-vat network.

To explain this I will first share some information from the capnp docs site. Half way through a list of features on the RPC Protocol page there is a vague reference:

Three-way interactions. A network of Cap’n Proto vats (nodes) can pass object references to each other and automatically form direct connections as needed.

In case you missed it, this is the (only?) reference to the 'Vat' word we can see in the code. Apparently a Vat is a node. Cool. Not sure why they didn't call a node a.. "Node". 🤷 But ok. Now we know.

So this magical code...

 let network = twoparty::VatNetwork::new(
                    reader,
                    writer,
                    rpc_twoparty_capnp::Side::Server,
                    Default::default(),
                );
Enter fullscreen mode Exit fullscreen mode

...takes an input stream & output stream (the TCPListener stream that we wrapped with Tokio (out of scope of this article - please go explore Tokio articles) a Side (Server or Client) and some internal options. It's easier to see this with the Types annotated by Rust Analyzer:

Image description

It returns a capnp RPC vat (node) network.

Then we set up the RPC system:

let rpc_system =
   RpcSystem::new(Box::new(network), Some(point_tracker_client.clone().client));
Enter fullscreen mode Exit fullscreen mode

A lot going on there, Let's get the Rust Analyzer view again:

Image description

Ok we have a new RPC System that contains a pointer (Box) to the VatNetwork we made above. It takes a "bootstrap" param which is our server-client thing we made earlier. And we may need to cycle around the loop again if the spawn'd task ends, so we clone it.

Finally we spawn the task and can call map() on the RPC System to start it. Another mild oddity but I can live with it. I guess it make sense if I want to process the return value of the RPC System for some reason; then this nests it neatly.

Make the RPC Client

The client code is similarly built except there is no loop. I left a comment in the code above that says //RPC code and left a space for the capnp-rpc code. This code looks as follows:

            let rpc_network = Box::new(twoparty::VatNetwork::new(
                reader,
                writer,
                rpc_twoparty_capnp::Side::Client,
                Default::default(),
            ));
            let mut rpc_system = RpcSystem::new(rpc_network, None);
            let point_tracker: point_tracker::Client =
                rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);

            tokio::task::spawn_local(Box::pin(rpc_system.map(|_| ())));

            let mut request = point_tracker.add_point_request();

            // let's make a Point:
            let mut message = ::capnp::message::Builder::new_default();
            let mut new_point = message.init_root::<point::Builder>();
            new_point.set_x(5_f32);
            new_point.set_y(10_f32);

            request.get().set_p(new_point.into_reader())?;

            let reply = request.send().promise.await.unwrap();

            println!(
                "Total points in Point Tracker: {}",
                reply.get().unwrap().get_total_points()
            );
Enter fullscreen mode Exit fullscreen mode

So again we are making a two-party-vat-network (node network) but this time we are using the capnp config Side enum of Client. We then boot the RPC System with the network and a None because we don't have server code on this side to attach.

In fact this is a mental hook for understanding what's happening under the surface: The None is replaced by a capability on the server side (our point-tracker server-client thingy, which is a capability::Client) - but because we are currently building the client side of the RPC, there is no capability to bootstrap the RPC system with. We just want to connect and call a remote procedure - but before that we need the interface from the other side.

The next line confusingly also says "bootstrap" (same name as the RpcSystem::new() param). Yes, there are two things called Client and two things called bootstrap. Serious name-grokking challenge in this project but it's still evolving so I will appreciate what it does and not how it looks...

Anyway, back to rpc_system.bootstrap(). This one is interesting - it's the opposite of the other bootstrap (which is a weak argument for the function having the same name as a param...). This function sends a vat (node) ID and responds with the "bootstrap interface" of the other side! This is why the result is attached to the point_tracker variable.

You might notice that we have to explicitly state the type as point_tracker::Client. This does actually make sense. If you remove the explicit type, this happens, two lines down:

Image description

The compiler has no idea if point_tracker has an add_point_request() function and what it might return - because this is just code. Nothing has been sent yet!


Let's send some points!

Once the RPC System is up and running (with the .map() as before), we can send our message.

To make the request we use the add_point_request() function which is provided in the generated code.

Unfortunately a lot of the examples use on-the-fly creation of the capnp message they are sending. This is not practical if the message needs to be created by some other code in our application. So in the example, I'm manually creating a message and initialising it with point::Builder, as we did in Part 1, and then setting this as the message in the request.

Finally, we send the message and use the RPC promise system as our async approach to await the response. And that's it!

If we run the server in one terminal window, we get the confirmation message that it is running:

Image description

And if we run the client from another terminal window, we can start filling up the Point Tracker with points!

Image description

You can keep running the client and we can see the total points increasing as the server stores the points we are sending.

Final Words

This write up took a little longer than I planned because, to be frank, the Cap'n Proto documentation (for Rust at least) is a bit pants at the moment (a UK colloquialism for "it needs some work"). However, I really like this project and really, I've just touched the surface of what it's capable of in this small tour. With some better documentation and some evolution around its RPC features, I think this is going to be one to watch. It feels oddly cool to have the ability to extract data directly from network messages. I'm going to integrate capnp into a project I'm working on and I may do a second write-up at that point.

There should be a "so what" at the end I guess. There is: From an abstract perspective, network data requires conversion to a serialised stream of bytes that can be sent via some protocol that can reconstruct (deserialise) that data back into something a network node can consume for its data processing purposes. As we progress down the road of gigantic user data (my phone has half a TB of memory, don't tell me we're not going there - and Genshin Impact does a multi-gigabyte update every time you load it) combined with an increasing need to provide information immediately, there is one step in the process we will be able to remove completely with Cap'n Proto - the deconstruction/reconstruction steps. Similarly to Rust's energy saving side effect, this probably makes Cap'n Proto the most energy efficient protocol out there. I can imagine the Cap'n is going to be a big deal just a little further down that road.

💖 💪 🙅 🚩
kushalj
Kushal Joshi

Posted on June 12, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related