Reinvent the wheel to understand Python asyncio.
Andrea Tedeschi
Posted on March 29, 2023
During the last years coroutine based concurrency and the async/await
syntax has exploded over a lot of languages, and so it has in Python too.
I found that people start using it recently (who haven't been there while it was implemented) are experiencing different kinds of problems while coding or debugging.
In this article we're gonna write our own multitasking library without using neither asyncio
, nor the async
and the await
keywords, after exploring the basics of concurrency.
This post assumes you're already familiar with python, python iterators and generators and with socket programming.
What is concurrency anyway?
According to wikipedia
Concurrent computing is a form of computing in which several computations are executed concurrently—during overlapping time periods
In practice it means that while our function a()
is executing, other functions may execute too and they run to completion in an interleaved manner.
However our program is still executing one thing at a time (as our cpu core is capable).
Parallelism (executing more things at once) is a special form of concurrency, but we're not talking about it today.
How do we reach concurrency in our programs
The easiest way to write a concurrent program is by using threads: you spawn a function in a thread, it starts running and any time the opportunity arises our cpu will switch between threads.
However, there are well known problems with threading programming like synchronization, memory usage, not having the control over context switches, etc, which all combined lead to some scalability limitation (you can find useful resources and articles on those problems searching online). Developers looked for something more lightweight and scalable that can be combined with multi-threading if needed and they came out with.. iterators.
Concurrency with iterators
How do we achieve concurrency with iterators? There are two core concepts to keep into account:
- Interleaving.
- Execution during overlapping time periods.
If you think about how we can interleave execution of different units of code without spawning threads, you'll probably find out that you need a way to pause/resume that unit of code.
Look at the very basic implementation of an iterator:
class ConcurrentUnit:
def __init__(self, to: int):
self._to = to
self._i = -1
def __iter__(self):
return self
def __next__(self):
self._i += 1
if self._i >= self._to:
raise StopIteration
return self._i
As you already know a for loop just keeps calling .__next__
until StopIteration
raises. Let's abuse this to execute code concurrently.
from typing import TypeVar
T = TypeVar('T')
class ConcurrentUnit:
def __init__(self, to: int, return_value: T):
self._to = to
self._i = -1
self._return_value = return_value
def __iter__(self):
return self
def __next__(self):
self._i += 1
if self._i >= self._to:
raise StopIteration(self._return_value)
return self._i
if __name__ == '__main__':
cu1 = ConcurrentUnit(5, 'cu1')
cu2 = ConcurrentUnit(3, 'cu2')
tasks = [cu1, cu2]
while tasks:
t = tasks.pop(0)
try:
step = next(t)
print(step)
except StopIteration as e:
print(e.value)
else:
tasks.append(t)
If you run that code, the output will be:
0
0
1
1
2
2
3
cu2
4
cu1
You can see that our units have executed in an interleaved manner during overlapping periods of time and so yes, even without any benefit yet, we have written concurrent code. Let's look at it in detail.
The ConcurrentUnit
class should be very easy to understand, from a behavioral point of view it's simulating the usage of range(x)
(I've omitted start
to keep it simple), but it also has a return_value
parameter with a generic type annotation, that enable returning values from the execution. The return_value
is bound to StopIteration
when raised by __next__
and we need to manually handle it calling __next__
in a try/except
block (we can't simply use a for loop which would handle the exception silently).
In our main block we create two concurrent units (they could've been more) and we store them in a list (like we would've done with a scheduler, more on this later) and we run our loop:
- First we pop our first unit in the list.
- We call
next
on our unit in atry/except
block, and print the result out. - If it raises
StopIteration
we get thereturn_value
and print it out. At this point we know that our unit is done. - Otherwise, we know that our unit is not done, so we append it to our list.
Python Generators
The code above is very odd, and wrapping the logic of functions in iterator classes will really soon lead to big spaghetti code.
Luckily for us python has generators, which will let us declare functions that behave like iterators, and in addition they bound the value of the return
statement to the StopIteration
instance.
We can convert our above code into:
def concurrent_unit(to: int) -> Generator[str]:
for i in range(to):
yield i
return f"run for {to} times"
if __name__ == '__main__':
cu1 = concurrent_unit(5)
cu2 = concurrent_unit(3)
tasks = [cu1, cu2]
while tasks:
t = tasks.pop(0)
try:
step = next(t)
print(step)
except StopIteration as e:
print(e.value)
else:
tasks.append(t)
and the code will behave the same.
There's an important concept you need to understand before we can move on and that's the difference between generator objects and generator functions.
A generator function is a function that just return a generator object: it does not execute any code other than creating the generator object. They can be recognized as the function body contains at least one yield
.
The resulting generator object than implement the iteration protocol with __next__
executing code up to the next yield
statement.
This concept applies to coroutines too: async def
functions are coroutine functions that return coroutine objects when called.
From now on, when I'll say generator I could either refer to functions or objects, the context will make it clear.
Build our own concurrency library
In this section we're gonna use generators to develop the basic of our concurrency library.
First we'll define a Task
object to wrap around generators and providing a layer of abstraction over our dude spaghetti. Then we'll write a scheduler to handle tasks execution. Let's dive into it.
from collections.abc import Generator
from typing import Any, TypeVar
T = TypeVar("T")
class Task:
def __init__(
self,
generator: Generator[Any, Any, T],
*,
name: str | None = None,
debug: bool = False,
) -> None:
self._generator = generator
self._name = name
self._debug = debug
self._result: T | None = None
self._exception: Exception | None = None
self._done = False
def __repr__(self) -> str:
return f"<Task: {self._name}, done: {self._done}>"
def _step(self) -> None:
if self._done:
raise RuntimeError(f"{self}: Cannot step a done task")
try:
step = self._generator.send(None)
if self._debug:
print(f"{self}: {step}")
except StopIteration as e:
self._done = True
self._result = e.value
except Exception as e:
self._done = True
self._exception = e
if self._debug:
print(f"{self}: Exception: {e}")
def result(self) -> T:
return self._result
def exception(self) -> Exception:
return self._exception
def done(self) -> bool:
return self._done
Our Task
class stores a generator object and has 3 attributes that are worth looking at:
-
_done
indicating whether the task can be considered completed or not. -
_result
indicating the generator return value, if any. -
_exception
any exception other thanStopIteration
that our generator may raises.
The _step
method builds upon the execution logic used before with iterators: it represents a single "step" of our task. It calls next
on self._generator
(gen.send(None)
is the same as next(gen)
) and if we get either a result (wrapped in a StopIteration
error) or an exception, stores it in the corresponding attribute.
You're may asking yourself "why is he just storing the exception instead of raising it?". In the next section I'm answering that question. By now, go on to build a scheduler for our tasks:
from collections.abc import Callable, Generator
from typing import Any, TypeVar
from .tasks import Task
T = TypeVar("T")
class EventLoop:
def __init__(self, *, debug: bool = False) -> None:
self._debug = debug
self._tasks: list[Task] = []
self._tasks_counter: int = 0
def create_task(
self, generator: Generator[Any, Any, T], *, name: str | None = None
) -> Task:
task = Task(
generator,
name=name or f"Task-{self._tasks_counter}",
debug=self._debug,
)
self._tasks.append(task)
self._tasks_counter += 1
return task
def run_until_complete(
self,
generator: Generator[Any, Any, T],
*,
task_name: str | None = None,
) -> T:
main_task = self.create_task(generator, name=task_name)
while not main_task._done:
for task in self._tasks:
task._step()
if task._done:
self._tasks.remove(task)
if main_task._exception:
raise main_task._exception
return main_task._result
Looks familiar? Our event loop has a method to create new task objects and one for running them.
run_until_complete
takes a generator, creates a task from it (main_task
) and then runs all the scheduled tasks until main_task
completes. The execution logic is not different from our first poc with iterators: we iterate through self._tasks
, run "one step" of each item and any time a task has done we remove it from the list.
def concurrent_unit(to: int) -> Generator[str]:
for i in range(to):
yield i
return f"run for {to} times"
if __name__ == '__main__':
loop = EventLoop(debug=True)
t1 = loop.create_task(concurrent_unit(2))
t2 = loop.create_task(concurrent_unit(3))
loop.run_until_complete(concurrent_unit(5))
# output
# <Task: Task-0, done: False>: 0
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-0, done: False>: 1
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: Task-2, done: False>: 2
# <Task: Task-1, done: False>: 2
# <Task: Task-2, done: False>: 3
# <Task: Task-2, done: False>: 4
Cool, but what about await?
As I stated before, we're not allowed to use the async
and the await
keywords, so how we're going to achieve the same functionality here?
We learned so far that coroutines are just iterators (or better generators), so how do we await an iterator? If you thought about for loops, then you're right. Let's look at the example below:
def concurrent_unit(to: int) -> Generator[str]:
for i in range(to):
yield i
return f"run for {to} times"
if __name__ == '__main__':
loop = EventLoop(debug=True)
def main():
t1 = loop.create_task(concurrent_unit(2))
t2 = loop.create_task(concurrent_unit(3))
yield 'a'
loop.run_until_complete(main())
# output
# <Task: Task-0, done: False>: a
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-2, done: False>: 1
As you can see t1
and t2
do not complete and that's because main()
completes before them. If you look at the run_until_complete
source code, you see that when main_task
is done we exit the while loop, no matter if there are still undone tasks. While this is the intended behaviour, we need a way to wait for completion of specific tasks before moving on and we're gonna do that with for loops:
def concurrent_unit(to: int) -> Generator[str]:
for i in range(to):
yield i
return f"run for {to} times"
if __name__ == '__main__':
loop = EventLoop(debug=True)
def main():
t1 = loop.create_task(concurrent_unit(2))
t2 = loop.create_task(concurrent_unit(3))
for step in concurrent_unit(5):
yield step
loop.run_until_complete(main(), task_name='main_task')
# output
# <Task: main_task, done: False>: 0
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: main_task, done: False>: 1
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: main_task, done: False>: 2
# <Task: main_task, done: False>: 3
# <Task: Task-2, done: False>: 2
# <Task: main_task, done: False>: 4
This time all the tasks completed.
Before diving in what's wrong we the above code we have to thanks Python and generators once more: instead of using an ugly for loop to replace await, we can use the yield from
syntax: yield from gen
is the same as for x in gen: yield x
. From now on, we'll use yield from
as our await
(and that's what Python does too under the hood).
def main():
t1 = loop.create_task(concurrent_unit(2))
t2 = loop.create_task(concurrent_unit(3))
yield from concurrent_unit(5)
As I stated before the above code introduces some pitfalls. Since we can not yield from
objects we can not await tasks, in fact you may have noticed that I didn't spawn a task from concurrent_unit(5)
. To have some consistency we have to find a way to yield from
tasks.
We can write an helper function that takes a task object and keeps calling _step
until it's done, but that will conflict with the event loop calling _step
too. We could make Task
an iterator defining __iter__
and __next__
and that will work (you can use yield from
with iterators). However, generators are usually faster than iterators (I won't dive into it, if you're interested in that topic you can find useful resources searching on google) so I opted to write a new method on the task interface, a generator function just yielding back to the event loop until the task is done.
class Task:
def __init__(
self,
generator: Generator[Any, Any, T],
*,
name: str | None = None,
debug: bool = False,
) -> None:
self._generator = generator
self._name = name
self._debug = debug
self._result: T | None = None
self._exception: Exception | None = None
self._done = False
def __repr__(self) -> str:
return f"<Task: {self._name}, done: {self._done}>"
def _step(self) -> None:
if self._done:
raise RuntimeError(f"{self}: Cannot step a done task")
try:
step = self._generator.send(None)
if self._debug:
print(f"{self}: {step}")
except StopIteration as e:
self._done = True
self._result = e.value
except Exception as e:
self._done = True
self._exception = e
if self._debug:
print(f"{self}: Exception: {e}")
def result(self) -> T:
return self._result
def exception(self) -> Exception:
return self._exception
def done(self) -> bool:
return self._done
def wait(self) -> T:
while not self._done:
yield
if self._exception:
raise self._exception
return self._result
Now we can refactor our example with our new wait logic:
if __name__ == '__main__':
loop = EventLoop(debug=True)
def main():
t1 = loop.create_task(concurrent_unit(2))
t2 = loop.create_task(concurrent_unit(3))
t3 = loop.create_task(concurrent_unit(5))
# yield from task.wait() will either raise
# task._exception (if it's not None) or
# return task._result.
# This means that exceptions do not propagate
# until the task is awaited.
result = yield from t3.wait()
yield 'a'
yield 'b'
loop.run_until_complete(main(), task_name='main_task')
# <Task: main_task, done: False>: None
# <Task: Task-1, done: False>: 0
# <Task: Task-2, done: False>: 0
# <Task: Task-3, done: False>: 0
# <Task: main_task, done: False>: None
# <Task: Task-1, done: False>: 1
# <Task: Task-2, done: False>: 1
# <Task: Task-3, done: False>: 1
# <Task: main_task, done: False>: None
# <Task: Task-3, done: False>: 2
# <Task: main_task, done: False>: None
# <Task: Task-2, done: False>: 2
# <Task: Task-3, done: False>: 3
# <Task: main_task, done: False>: None
# <Task: main_task, done: False>: None
# <Task: Task-3, done: False>: 4
# <Task: main_task, done: False>: None
# <Task: main_task, done: False>: a
# <Task: main_task, done: False>: b
As you can see we are awaiting the task, yielding from its wait
method. Without this method, whenever we've needed to retrieve the result of a task (either a value or an exception), we should've done it accessing the related attribute.
As Python asyncio does, we propagate exceptions via await: we don't re-raise immediately after having caught them, but when the task is awaited.
I want to mention one last thing about await
in Python: a general misconception about it is that when we await
a coroutine or a task we are telling the event loop to "not doing anything else until that coroutine either returns or raises", however, as you can see from the output of our last example, tasks scheduled before our yield from
(and so await
) statement still run interleaving with the awaited task (t3
).
What await
really tells to the event loop (and that's an approximation, since the event loop it's not aware of it) is: "do not go over with the current task until the task I'm awaiting on has done. In the meantime you can still run other scheduled tasks" where current task is main
and the task I'm awaiting on is t3
in our current context.
Again, that sentence describes the await
behaviour but it's not really true since a task can not control what the event loop does. Actually we are taking care of preventing the execution of the current task to go on before t3
completes, rather than giving instructions to the event loop.
Run blocking code
Sometimes you may need to use blocking functions (functions that can't yield back to the event loop). You can use threads to run such functions to avoid blocking the execution of the event loop and one of the most efficient ways to do it is with a threadpool.
Since we're just exploring core concepts we're not gonna implement a threadpool ourselves, but we'll just use a method to spawn a callable in a new thread. You can learn more about threadpools searching by your own or reading through the Python concurrent.futures.thread
source code.
We can modify our event loop implementation a bit to handle a set of worker threads:
class EventLoop:
def __init__(self, *, debug: bool = False) -> None:
self._debug = debug
self._tasks: list[Task] = []
self._tasks_counter: int = 0
self._workers: set[threading.Thread] = set()
def _spawn(self, callable: Callable[..., Any]):
thread = threading.Thread(target=callable)
thread.start()
self._workers.add(thread)
...
The above code will work, but there are two problems with it:
- Whenever we want to use arguments and keyword arguments we have to rely on
functools.partial
. - We need a way to retrieve the execution result.
To solve those problems we could write a class that encapsulate all the attributes and the logic we need and update EventLoop._spawn
signature to match it:
class _Work:
def __init__(
self, fn: Callable[..., T], /, *args, **kwargs
) -> None:
self.fn = fn
self.args = args
self.kwargs = kwargs
self.result: T | None = None
self.exception: Exception | None = None
def run(self) -> None:
try:
result = self.fn(*self.args, **self.kwargs)
except Exception as e:
self.exception = e
else:
self.result = result
You may have noticed that a pattern has emerged: we have result
and exception
and we could write a wait
generator method like the one of Task
to interoperate with non blocking code.
To be cleaner, let's put our common waiting logic in a base interface:
T = TypeVar('T')
class Waitable(ABC, Generic[T]):
@abstractmethod
def wait(self) -> Generator[Any, Any, T]:
...
We could then make Task
and _Work
inherit from Waitable
.
However, if you start thinking about all possible applications for that logic you may come up with a better solution. While Task
being a special case, the use case of _Work
may recur in the future and we should build a reusable interface for that:
class Waiter(Waitable):
def __init__(self) -> None:
self._result: T | None = None
self._exception: Exception | None = None
self._done: bool = False
def __repr__(self) -> str:
return f'<Waiter: done: {self.done()}>'
def done(self) -> bool:
return self._done
def result(self) -> T:
return self._result
def exception(self) -> Exception:
return self._exception
def set_result(self, result: T) -> None:
if self._done:
raise RuntimeError('Waiter is already done')
self._done = True
self._result = result
def set_exception(self, exception: Exception) -> None:
if self._done:
raise RuntimeError('Waiter is already done')
self._done = True
self._exception = exception
def wait(self) -> Generator[Any, Any, T]:
while not self.done():
yield
if self._exception:
raise self._exception
return self._result
Waiter
may resemble you Python Future
objects.. and once more, you're right. We have defined an object that act as a placeholder for a future result (either a value or an exception) that can be set by other functions. This is also a fundamental building block for synchronization.
Let's use Waiter
in the _Work
class:
class _Work:
def __init__(
self, waiter: Waiter, fn: Callable[..., T], /, *args, **kwargs
) -> None:
self.waiter = waiter
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self) -> None:
try:
result = self.fn(*self.args, **self.kwargs)
except Exception as e:
self.waiter.set_exception(e)
else:
self.waiter.set_result(result)
Now update the event loop implementation:
class EventLoop:
def __init__(self, *, debug: bool = False) -> None:
self._debug = debug
self._tasks: list[Task] = []
self._tasks_counter: int = 0
self._workers: set[threading.Thread] = set()
def _spawn(self, work: _Work):
thread = threading.Thread(target=work.run)
thread.start()
self._workers.add(thread)
def run_in_thread(self, fn: Callable[..., T], /, *args, **kwargs) -> Waiter[T]:
waiter = Waiter()
work = _Work(waiter, fn, *args, **kwargs)
self._spawn(work)
return waiter
...
Let's try it out:
if __name__ == '__main__':
# set it to `True` to better understand the behavior
loop = EventLoop(debug=False)
def blockingf(i: int) -> int:
time.sleep(1)
return f'BLOCKING finished after {i} seconds'
def genf(i: int) -> Generator[Any, Any, str]:
for j in range(i):
yield i
return f'non-blocking finished after {i} iterations'
def main():
t1 = loop.create_task(genf(3))
t2 = loop.create_task(genf(2))
w1 = loop.run_in_thread(blockingf, 2)
res_blocking = yield from w1.wait()
res1 = yield from t1.wait()
res2 = yield from t2.wait()
print(res1, res2, res_blocking, sep='\n')
loop.run_until_complete(main())
# output
# non-blocking finished after 3 iterations
# non-blocking finished after 2 iterations
# BLOCKING finished after 2 seconds
We have successfully mixed blocking code with "non blocking" code.
In the next section we'll build a concurrent network service using what we have developed so far.
Socket and Selectors
At the beginning of this post I've asked for your knowledge about socket programming.
If you've ever built a TCP service with Python, you eventually got in touch with the old problem of how to handle concurrent connections.
Python socket programming HOWTO has a section about non blocking sockets and select (I recommend you to read it if you've not, go here).
We're now going to combine what we have learned today with it, using selectors which is an high level interface over select
.
If you want to read more about selectors you can go through the documentation, but for the purpose of this post it's enough to understand that with selectors we can:
- Register a socket object waiting for either read or write events, associating data to it (any object, even a callable).
- Update the event kind or the data of a registered socket.
- Unregister a registered socket.
- Call
selector.select
to get a list of ready sockets (with other information like the event type and our associated data). We can safely assert that calling methods on socket objects returned byselector.select
are not going to block.
First we need to implement methods to register/unregister sockets on the event loop:
class EventLoop:
def __init__(self, *, debug: bool = False) -> None:
self._debug = debug
self._tasks: list[Task] = []
self._tasks_counter: int = 0
self._workers: set[threading.Thread] = set()
self._selector = selectors.DefaultSelector()
...
def _create_waiter(self) -> Waiter:
return Waiter()
def add_reader(self, fd: int, callback: Callable[..., None]) -> None:
try:
self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, selectors.EVENT_READ, callback)
def remove_reader(self, fd: int) -> None:
try:
self._selector.unregister(fd)
except KeyError:
pass
def add_writer(self, fd: int, callback: Callable[..., None]) -> None:
try:
self._selector.get_key(fd)
except KeyError:
self._selector.register(fd, selectors.EVENT_WRITE, callback)
def remove_writer(self, fd: int) -> None:
try:
self._selector.unregister(fd)
except KeyError:
pass
selectors.DefaultSelector
returns the best selector implementation for your platform and _create_waiter
is just a convenience method.
Then we have methods to register sockets (one for read events and one for write events) and the same to unregister them.
With that, we can define the non-blocking methods required to build our TCP server. For the scope of this post we'll only need 3: socket.accept
, socket.recv
and socket.sendall
.
class EventLoop:
...
def _sock_recv(
self, sock: socket.socket, nbytes: int, waiter: Waiter
) -> None:
try:
result = sock.recv(nbytes)
except (BlockingIOError, InterruptedError):
return
except Exception as e:
waiter.set_exception(e)
else:
waiter.set_result(result)
def sock_recv(
self, sock: socket.socket, nbytes: int
) -> Generator[Any, Any, bytes]:
waiter = self._create_waiter()
self.add_reader(
sock.fileno(),
functools.partial(self._sock_recv, sock, nbytes, waiter),
)
res = yield from waiter.wait()
return res
def _sock_sendall(
self, sock: socket.socket, data: bytes, waiter: Waiter
) -> None:
try:
result = sock.sendall(data)
except (BlockingIOError, InterruptedError):
return
except Exception as e:
waiter.set_exception(e)
else:
waiter.set_result(result)
def sock_sendall(
self, sock: socket.socket, data: bytes
) -> Generator[Any, Any, None]:
waiter = self._create_waiter()
self.add_writer(
sock.fileno(),
functools.partial(self._sock_sendall, sock, data, waiter),
)
res = yield from waiter.wait()
return res
def _sock_accept(self, sock: socket.socket, waiter: Waiter) -> None:
try:
result = sock.accept()
except (BlockingIOError, InterruptedError):
return
except Exception as e:
waiter.set_exception(e)
else:
waiter.set_result(result)
def sock_accept(
self, sock: socket.socket
) -> Generator[Any, Any, tuple[socket.socket, Any]]:
waiter = self._create_waiter()
self.add_reader(
sock.fileno(), functools.partial(self._sock_accept, sock, waiter)
)
res = yield from waiter.wait()
return res
def process_events(
self,
events: list[tuple[selectors.SelectorKey, type[selectors.EVENT_READ]]],
) -> None:
for key, mask in events:
fileobj, callback = key.fileobj, key.data
callback()
if mask & selectors.EVENT_READ:
self.remove_reader(fileobj)
if mask & selectors.EVENT_WRITE:
self.remove_writer(fileobj)
def run_until_complete(
self,
generator: Generator[Any, Any, T],
*,
task_name: str | None = None,
) -> T:
main_task = self.create_task(generator, name=task_name)
while not main_task._done:
ready = self._selector.select(0)
if ready:
self.process_events(ready)
for task in self._tasks:
task._step()
if task._done:
self._tasks.remove(task)
if main_task._exception:
raise main_task._exception
return main_task._result
def close(self) -> None:
for thread in self._workers:
thread.join()
self._selector.close()
To understand what's going on with that code, let's look at _sock_recv
and sock_recv
.
The former takes a socket and a Waiter
object, tries to run socket.recv
and stores either the result or any exception on the waiter.
The latter is a generator function that creates the waiter object and calls add_reader
to register the socket and _sock_recv
as a callback (we use functools.partial
to bound arguments to keep it simple). Then it awaits the waiter (yielding from its wait
method) and returns its result. We know that Waiter.wait
just yields until the waiter has done (and that means the _sock_recv
has called either set_result
or set_exception
).
As you can see in run_until_complete
, at each iteration we get a list of ready sockets that we have registered with add_reader
or add_writer
. As we said before we can trust self._selector.select
to give us only sockets that aren't gonna block so we run associated callbacks (like _sock_recv
) with process_events
.
So if we write:
...
data = yield from loop.sock_recv(sock_obj, 1024)
...
We are doing the following:
- We create a new waiter and we schedule
loop._sock_recv(sock_obj, 1024, waiter)
to run as soon asloop._selector.select
will tell us that sock_obj is ready. - We await the waiter. As we've learned before,
sock_recv
does not move on untilyield from waiter.wait()
is done, but other scheduled tasks still run. - At some point, during an iteration of the while loop in
run_until_complete
the selector will give ussock_obj
and the scheduled callback (_sock_recv
with the signature of step 1).process_events
will run the callback that's gonna set the result on our waiter. -
sock_recv
get the result ofwaiter
(so it exits theyield from waiter.wait()
line) and returns it.
We're now ready to build a concurrent tcp service. In the next section we'll build an echo service with what we've learned so far.
Concurrent echo service
At this point the implementation of our service should be straightforward.
loop = EventLoop()
def process_client(client: socket.socket, address: tuple[str, int]) -> None:
print('New client:', address)
try:
while True:
data = yield from loop.sock_recv(client, 1024)
print(address, data)
if not data:
break
yield from loop.sock_sendall(client, data)
finally:
client.close()
print('Client closed:', address)
def main1():
server = socket.create_server(
('127.0.0.1', 1234), family=socket.AF_INET, backlog=5, reuse_port=True
)
server.setblocking(False)
while True:
client, address = yield from loop.sock_accept(server)
client.setblocking(False)
loop.create_task(process_client(client, address))
loop.run_until_complete(main1())
In our main function, we create a "server" socket and set it to non-blocking. We then enter a while loop and accept new connections with loop.sock_accept
. Any time we get one, set it to non-blocking too and start a new task from process_client
to handle it.
process_client
itself, just keeps awaiting on loop.sock_recv
to get data and streams it back to the client with loop.sock_sendall
as long as it gets any data.
Let's run it. I'll use two terminals with telnet to connect and send data.
The output should be something like:
New client: ('127.0.0.1', 39698)
New client: ('127.0.0.1', 39700)
('127.0.0.1', 39700) b'hello world 1\r\n'
('127.0.0.1', 39698) b'hello world 2\r\n'
('127.0.0.1', 39698) b'I go now\r\n'
('127.0.0.1', 39698) b''
Client closed: ('127.0.0.1', 39698)
('127.0.0.1', 39700) b'me too\r\n'
('127.0.0.1', 39700) b''
Client closed: ('127.0.0.1', 39700)
Conclusion
In the end we have developed a concurrent service using neither async nor await directly, but we have replaced them with a similar underlying implementation to understand the core concepts behind them.
The aim of this post wasn't to give a better performing or a cleaner implementation of Python asyncio, but to demistify async
and await
for asyncio beginners, re-building already existing features.
With those fundamental building blocks for concurrent programming, you can implement any kind of concurrent feature. Think about async queues. It would be as simple as:
class Queue(Generic[T]):
def __init__(self, max_size: int = -1) -> None:
self._max_size = max_size
self._queue: deque[T] = deque()
def __repr__(self) -> str:
return f"<Queue max_size={self._max_size} size={len(self._queue)}>"
def qsize(self) -> int:
return len(self._queue)
def empty(self) -> bool:
return not self._queue
def full(self) -> bool:
if self._max_size < 0:
return False
return len(self._queue) >= self._max_size
def put(self, item: T) -> Generator[Any, Any, None]:
while self.full():
yield
self._queue.append(item)
def get(self) -> Generator[Any, Any, T]:
while self.empty():
yield
return self._queue.pop()
def put_nowait(self, item: T) -> None:
if self.full():
raise RuntimeError(f"{self} is full")
self._queue.append(item)
def get_nowait(self) -> T:
if self.empty():
raise RuntimeError(f"{self} is empty")
return self._queue.pop()
Or what about tasks cancellation? You can challenge yourself to implement it in the most efficient way (and updated EventLoop.close
to cancel remaining tasks).
I hope that this tour has been useful to you and I can't wait to read your feedback in the comments.
Posted on March 29, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.