Python asyncio: Great for I/O, do you use it for other things?

zenulabidin

Ali Sherief

Posted on January 14, 2020

Python asyncio: Great for I/O, do you use it for other things?

In this post I'm going to explain how to use asyncio to wait efficiently for I/O events.

Low level API methods will not be covered here.

There are many benefits to doing I/O asynchronously in your application:

  • The threads doing the I/O spend less time sleeping and waking up, especially if you have a thread pool which handles I/O requests.
  • Assuming the bottleneck is the file, socket or shared memory reading, your application becomes more scalable. Just add more threads to the pool.
  • If your application is bursty, which just means it gets I/O requests at a very high speed, asynchronous I/O offload most of the latency to the kernel because a callback is only called after the read or write finished.
  • ... Or you're just writing a GUI framework and don't care about the above stuff. To relate, .NET framework uses a lot of threads for the GUIs that it makes.

What is asyncio?

asyncio is a cross-platform built-in Python module that allows you to make asynchronous functions and run them in parallel without explicitly managing threads. The examples work best with Python 3.5 or later (in Python 3.4, there are different keywords for this, and this module isn't available at all in versions older than 3.4).

To spawn a bunch of parallel functions, call asyncio.run().

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    # You cancel the loop with loop.stop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())
# Outputs the current date every second for five seconds.
Enter fullscreen mode Exit fullscreen mode

The above example is technically using the asyncio subsystem, but while the main program thread is blocked calling asyncio.run(), only one asynchronous thread was actually made, which is not what we want. There is a way to run multiple threads (jobs) at once.

Although asyncio.run() blocks the program (which otherwise has a single thread of execution) while it runs, you can run functions simultaneously.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*. If you selectively
    # cancel one of these function calls, the entire gather()
    # call gets cancelled.
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

#Task A: Compute factorial(2)...
#Task B: Compute factorial(2)...
#Task C: Compute factorial(2)...
#<one second later...>
#Task A: factorial(2) = 2
#Task B: Compute factorial(3)...
#Task C: Compute factorial(3)...
#<one second later...>
#Task B: factorial(3) = 6
#Task C: Compute factorial(4)...
#<one second later...>
#Task C: factorial(4) = 24
Enter fullscreen mode Exit fullscreen mode

Since computing factorials with a time delay isn't really the point of this post, here is an example which shows actual reading and writing:

import asyncio

async def readbytes(name, number):
    with open(name, "rb") as f:
        s = f.read(number)
        await asyncio.sleep(number)
        print(f"File contents: {s}")

async def writebytes(name, number, string):
    with open(name, "w") as f:
        f.write(string)
        await asyncio.sleep(number)
        print(f"Wrote: {string}")

async def baz():
    await asyncio.gather(
        readbytes("/dev/urandom", 2),
        readbytes("/dev/random", 3),
        readbytes("/dev/zero", 4),
        writebytes("/tmp/spam", 2, "spamspam"),
        writebytes("/tmp/eggs", 3, "eggseggs"),
        writebytes("/tmp/bar", 4, "barbar"),
    )

asyncio.run(baz())
Enter fullscreen mode Exit fullscreen mode

Let's take a look at the syntax and the statements that were used in the examples above.

async keyword

All functions which will be called asynchronously must be defined with the async keyword. If you leave that out and try to use asyncio.run() with a normal function, it will raise an exception of "RuntimeError: no running event loop".

await keyword

This keyword allows asynchronous functions to yield the processor and allow other threads to run.

asyncio.sleep()

You use this after an await keyword to sleep for the desired amount of seconds.

asyncio.gather()

This is also used after await when you want to call multiple functions simultaneously. They don't have to be identical functions like in the examples above, in fact you can specify different functions in this call.

If an uncaught exception propagates in one of the functions, all of them are cancelled.

asyncio.get_running_loop()

When used inside an asynchronous function this will return the current event loop that is being run. An event loop is the object that represents the asynchronous functions being run. This can only be used inside an asynchronous function.

time() method in event loop object

According to the Python documentation, this method returns a float representing the current time in seconds. But this method is used more for making timing intervals, since you could add a number (say 42) to the time and have a time object that's 42 seconds in the future.

Cancelling tasks

You don't have to wait for the asynchronous method to complete. You can cancel the task by getting the current event loop with asyncio.get_event_loop() and then call its stop() method. It will stop at the next await.

You can choose a timeout of how long you want to wait with await asyncio.wait_for(very_long_function(), timeout=1.0) and the runtime will throw an asyncio.TimeoutError if it waits too long.

You can also prevent an async function from being interrupted if its asynchronous call is cancelled. In this case the function itself will continue to run but the main thread will no longer be blocked on the async call. To use this you would type await asyncio.shield(some_function()). This will throw a asyncio.CancelledError when it is cancelled.

Here's an example that shows how a task could be cancelled.
This will not print 42 or anything else because it was immediately cancelled.

import asyncio

async def nested():
    loop = asyncio.get_running_loop()
    loop.stop()
    await asyncio.sleep(1)
    print("42")

async def foo():
    # Schedule nested() to run soon concurrently
    # with "foo()".
    task = asyncio.ensure_future(nested())
    # In Python 3.7+ you can also use:
    #task = asyncio.create_task(nested())
    await asyncio.sleep(3)

asyncio.run(foo())
Enter fullscreen mode Exit fullscreen mode

As you can see, there is a new function used here called asyncio.ensure_future() and in Python 3.7 and later, you also have the identically behaving asyncio.create_task(). These are low-level API functions and all you need to know about them is that they run an asynchronous function in the argument and return a Task object.

Why I'm not covering the low level API

To be clear, the low level API is just Event Loop, Futures, Transports, Protocols and Policies.

Now that the tutorial is over, I admit that at this point, the discussion is going to be biased. The reason why I don't like the low level functions is that they feel... a little redundant. You have event loop containing functions that mimic stuff in multiprocessing. While asyncio is a great way to offload IO, it takes more effort to make it perform OK at computations because these things block the event loop. The only way to yield is with await after all.

There are other functions which can be classified as part of the high level asyncio API, like synchronization primitives, queues (neither are thread-safe) and subprocesses. All of those are either thread-unsafe or require special care to avoid deadlocks. As far as I know, thread-safe primitives and queues already exist in threading and manager and you don't need to make a bunch of threads to create multiple subprocesses running at the same time.

So I guess it all comes down to which module or combination of modules you prefer to implement concurrent functions, and each has their own advantages and drawbacks. asyncio is a new module and more features are being added to it in each version.

Image by Gerd Altmann from Pixabay

💖 💪 🙅 🚩
zenulabidin
Ali Sherief

Posted on January 14, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related