Roger Torres (he/him/ele)
Posted on September 12, 2021
Quoting its first announcement, "Tokio is a platform for writing fast networking code in Rust [and] is primarily intended as a foundation for other libraries".
The central piece of this platform, the runtime, is also named tokio, and that is what this post is about; for understanding tokio runtime is vital for understanding the platform as a whole and — given the current state of things — how to write asynchronous code in Rust.
What is an asynchronous runtime?
Core Rust provides the types to build asynchronous applications. However, when building, say, an asynchronous network application, we found ourselves in the need of a lot of boilerplate code. We can write it ourselves, or we can use a library that gives it to us ready-made (and probably better-made). And that is what an asynchronous runtime such as Tokio does, it provides the building blocks to construe such an application.
Futures
Let us start by taking a look at what core Rust brings to the table, so we can better understand what we would lack if we were to build an asynchronous network application all by ourselves.
P.S. I already wrote an introduction to async Rust, so this will be a dried out explanation.
Asynchronous Rust allows us to create concurrent applications. It does so via the syntax async/.await
. Basically, blocks and functions declared with async
desugar into a block or function that returns an implementation of a trait called Future
. Future
is a state machine, so it can keep up with the progress made in a certain operation, which means it can stop processing at some point and, when executing again, continue from where it stopped. On a higher level, we might say that a future is the representation of a value that may or may not be ready, a duality that is put forward using an enum called Poll
that has two variants: Pending
and Ready<T>
. The Future
trait also has a function, called poll()
, that will try to make as much progress as possible within the future (thus driving the state machine forward). This function, poll()
, is first executed when we .await
the future. To .await
the future is to deliver it to a scheduler (formerly known as executor) that will poll()
it. If it is processed through completion, Ready<T>
is returned, otherwise Pending
is returned and the scheduler keeps the future aside, waiting for a request to poll()
it again. This request comes from the driver (formerly known as reactor), which is an I/O event loop.
Rust does not provide these last two. That is why we need a crate to help us with that. Furthermore, we also need some time-related utilities to handle all this scheduling stuff.
Needless to say, this is precisely what the Tokio runtime provides. Quoting its documentation:
Unlike other Rust programs, asynchronous applications require runtime support. In particular, the following runtime services are necessary:
- An I/O event loop, called the driver, which drives I/O resources and dispatches I/O events to tasks that depend on them.
- A scheduler to execute tasks that use these I/O resources.
- A timer for scheduling work to run after a set period of time.
Scheduler
When you code an async function for the first time, you realize that the place from which you are calling this function also has to be async. And if you go all the way up and try to make your main()
function async, Rust will tell you that "main
function is not allowed to be async
".
Asking Rust to explain
this error gives us a hint:
$ rustc --explain E0752
`fn main()` or the specified start function is not allowed to be `async`. Not having a correct async runtime library setup may cause this error.
A quick search on the web is enough to provide the solution: we got to import tokio
and use this attribute macro:
#[tokio::main]
async fn main(){
// ...
}
However, even thought it certainly works, a question remains…
Why?
Because at some point the futures have to be dealt with, and there is nothing above the main()
function in a Rust program, so whoever is handling them, have to be below main()
. Another way to put it is to say that the main()
is the entry door of your program. The operating system running the binary knows nothing about futures, so they have to be managed "inside the house", that is, after we entered the program. So, main()
has to be synchronous.
If that is the case, how does Tokio manage to make main()
async, if the top-level function cannot be async? Well, it does not. #[tokio::main]
will desugar async fn main()
into something like this:
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// ...
})
}
When using the attribute macro #[tokio::main]
, we are building a runtime below main()
, a runtime that will handle the tree of futures. Why am I calling it a tree? Because a future may .await
other futures. I will talk more about multiple .await
calls later. For now, let us move on with this idea of handling a tree of futures.
Handling the tree of futures
Consider the example below.
#[tokio::main]
async fn main() {
let (foo, bar) = tokio::join! { foo(), bar() };
println!("{}{}", foo, bar);
}
async fn foo() -> &'static str {
let listener = std::net::TcpListener::bind("0.0.0.0:8080").unwrap();
match listener.accept(){
Ok(_pair) => {
println!("`foo()` is finished");
"foo"
},
Err(_error) => "error"
}
}
async fn bar() -> &'static str {
let listener = std::net::TcpListener::bind("0.0.0.0:8081").unwrap();
match listener.accept(){
Ok(_pair) => {
println!("`bar()` is finished");
"bar"
},
Err(_error) => "error"
}
}
Tip: run the code above and connect to both
0.0.0.0:8080
and0.0.0.0:8081
using your browser and check the result in the terminal where you ran the program.
When we call foo().await
, we are handing foo()
's future to the runtime scheduler, the one responsible for calling poll()
on it. Futures are executed by the scheduler as part of tasks. You might think of a task as a thread that is not handled by the OS scheduler, but by the runtime scheduler (they are virtual/green threads).
This will run foo()
as far as possible towards completion, which means that the executor will not preemptively stop it to run something else in its stead (as the OS does with its threads). For the Tokio scheduler, as far as a task is doing relevant work, it may keep working. In a more technical jargon, tasks run until they yield. In our example, foo()
runs until it starts listening at port 8080
. If you're trying to understand which part of our code is explicitly yielding the task, give up. It is not there. We don't code yields, Rust manages that for us.
After foo()
yields, join!
—a macro that .await
s—will call bar()
, which will run until it starts listening at port 8081
. At this point, as both functions have yielded, we have two futures waiting to be polled again, and they may be polled in any order. Now, imagine that foo()
and/or bar()
call async functions inside them, giving new tasks to the scheduler. In a scenario like this, we have a tree of futures. One important thing to understand here is that we have a "root-future" (the async book calls it “top-level future”, but I will stick with “root”); in this case, it is the future returned by that async block in main
(you will find inside block_on()
in the desugared example). And this is important for at least two reasons.
First, a task is responsible for a tree of futures. So, let's say we have an async fn main()
. As we saw, under the hood this is a normal main()
that will block_on()
an async block. If inside this future we .await
another future, it will be dealt by the same task, as it is part of the same tree.
Second, it points to the threshold between concurrency and parallelism. If you just .await
or join!
futures, you will never have two Tokio tasks running simultaneously because, at the end, our main()
is .await
ing the root-future, and its node-futures are executed one after the other as part of the same task, hence in the same OS thread. In other words, your async program will have concurrency, but not parallelism.
Revisiting our example, even if ports 8080
and 8081
are accessed at the same time, foo()
and bar()
will be executed one after the other because they are fruits futures of the same tree. Sure, this is no big deal here, but if you remember that we are talking about network applications and, by doing so, extrapolate over this silly example, you will quickly see this cannot be right.
Scheduling parallel tasks
As mentioned above, main()
is the entry point of our program, so everything we are doing is below it. And what we have below (what #[tokio::main]
desugars to) is a runtime
that was built using new_multi_thread()
: a multi-threaded Tokio runtime. So far, we have been using only one of those threads; it is running our task spawned by block_on()
. If we want parallelism, we need to hand our futures to the runtime itself, so they can become a "root-future" and, as such, become new tasks. To achieve such parallelism, that is, to allow the runtime to execute our tasks with a different worker of its thread pool, we got to spawn
the tasks.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let db: Arc<Mutex<HashMap<&str, &str>>> = Default::default();
tokio::spawn(foo(db.clone()));
tokio::spawn(bar(db.clone()));
handle(db).await;
}
async fn foo(db: Arc<Mutex<HashMap<&str, &str>>>) {
let listener = std::net::TcpListener::bind("0.0.0.0:8080").unwrap();
match listener.accept(){
Ok(_pair) => {
loop {
if let Ok(mut lock) = db.try_lock(){
println!("`foo()` is finished");
lock.insert("f", "foo");
break;
}
}
},
Err(_error) => println!("error"),
}
}
async fn bar(db: Arc<Mutex<HashMap<&str, &str>>>) {
let listener = std::net::TcpListener::bind("0.0.0.0:8081").unwrap();
match listener.accept(){
Ok(_pair) => {
loop {
if let Ok(mut lock) = db.try_lock(){
println!("`bar()` is finished");
lock.insert("b", "bar");
break;
}
}
},
Err(_error) => println!("error"),
}
}
async fn handle(db: Arc<Mutex<HashMap<&str, &str>>>) {
loop {
if let Ok(lock) = db.try_lock(){
if lock.len() == 2 {
println!("{}{}", lock.get("f").unwrap(), lock.get("b").unwrap());
break;
}
}
}
}
In the example above, foo()
and bar()
become root-futures in their own right, and as handle()
is the single future within the block_on()
future, we end up with three different trees of futures. That way, if we call all three functions at the “same” time, they can be executed in three different threads (assuming the runtime has these threads).
I might have went a little over the top by using
Arc<Mutex<HashMap>>
, since it could be dealt with in an easier manner withJoinHandle
. My reasoning was that using the smart pointers made it easier to see the parallelism, as theJoinHandle
looks very similar to how we use.await
.
Going beyond
If you want to go above and beyond, a good place to start is to understand how Tokio employs a work-stealing technique to manage its multithreaded scheduler.
Driver
Let us reconsider our previous example. After foo()
and bar()
both yield, which happens once they start listening at 0.0.0.0
, they return Poll::Pending
. As there is still work to be done, the scheduler will not get rid of them, but will not poll()
them again autonomously; it will poll()
them again under request.
In this case, the source of the need to poll()
them again is the access to 0.0.0.0
. However, if neither foo()
nor bar()
are actually running, which process will pull the trigger? Something has to be running to mediate our access to 0.0.0.0
and the scheduler. That is the role of the driver, which is how Tokio call its I/O event loop.
Before diving into the driver, though, let us talk a bit more about what make it necessary: a pending future.
Pending future
Maybe this topic belongs to the scheduler, but as it is vital for an understanding of the driver, I think it also fits here.
When we poll a future, it receives a Context
as an argument. Currently, this Context
is just a wrapper for the &Waker
. This &Waker
is a reference to the Waker
found within the task that called poll()
. This &Waker
has a method wake()
that is called by the driver, so the task (that owns Waker
) becomes aware that it should poll()
the future once again.
The following flow is an illustrative example of how it works:
- A future is
.await
ed. - As such, it is handed to the scheduler task that was created by
block_on()
. - This task will
poll()
the future, which will do some work until it reaches the point where it has to yield; let's say it is listening at some address, as ourfoo()
was. - Before yielding, the future
clone()
the&Waker
received as an argument inpoll()
. That “binds” the future and the task. - It yields, returning
Poll::Pending
. - When the operating system's I/O receives a connection on that certain address, it will let the driver know.
- The driver will call
wake()
on the&Waker
stored by the future, and this will wake up the task. - The awoken task will then
poll()
the future again. If it returnsPending
, the newWaker
that was passed by this lastpoll()
will be copied and the process restarts.
The 6th and 7th steps describes the role of the driver as an interface between the OS and the task scheduler. This means that the driver* will perform system calls to the OS, such as kqueue
in BSD/macOS, IPCP
in Windows or epoll
in Linux (and now we are hearing more and more about io_uring
, which Tokio handles as well).
* It is not really the driver that makes these system calls. The interaction is actually between the driver and
mio
, so it ismio
who interacts with the OS. That being said, I will abstract from it here, so we can depict a simplified conversation between Tokio's driver and the OS I/O, which comprises the 6th step above.
The driver, being an event loop, will keep polling the OS using one of these system calls. Let us retrieve our foo()
example. If the driver polls the OS and find out that there was a connection at 0.0.0.0:8080
, it will then wake()
the task for it to poll()
the future.
Sure, there is a myriad of details left out. For example, how the communication via channels between the scheduler and the driver actually works? Nevertheless, I will respect the beginners
tag with which I marked this post and stop here. (Even because, if I write posts for beginners, it is not only because I think we still miss more introductory content, but also because of my own current limitations; and here we are teetering on the edge of my knowledge gap 🙃).
Timer
The module tokio::time
is part of the runtime and provides utilities for tracking time. I don't have much to talk about these, but I will, for the sake of completion, quote the part of the documentation that explains what this module provides:
Sleep
is a future that does no work and completes at a specificInstant
in time.Interval
is a stream yielding a value at a fixed period. It is initialized with aDuration
and repeatedly yields each time the duration elapses.Timeout
: Wraps a future or stream, setting an upper bound to the amount of time it is allowed to execute. If the future or stream does not complete in time, then it is canceled and an error is returned.
I will stop here for today. I feel there is a lot missing, but this post is already longer than I wanted. Hopefully, we will be able to revisit some topics as we move on to talk about the other crates.
See you there!
Cover photo by Pawel Nolbert
Posted on September 12, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.