cloudfit-public-docs

Python Asyncio Part 2 – Awaitables, Tasks, and Futures

Having already covered the basic concepts in Python Asyncio Part 1 – Basic Concepts and Patterns, in this part of the series I will be going into more depth on the actual syntax used when employing this library in Python code. Many of the examples used here are based on code we have actually used as part of BBC R&D’s cloudfit project.

Writing Asynchronous Code

The most basic tool in the tool kit of an asynchronous programmer in Python is the new keyword async def, which is used to declare an asynchronous coroutine function in the same way that def is used to define a normal synchronous function.

TERMINOLOGY ICON TERMINOLOGY: In this article I will refer to async def as a keyword, and in future articles I will refer to async for and async with as keywords. Strictly speaking this isn’t true. In fact async is a keyword and so is def, but since you can’t use async by itself, only in combination with another keyword I think it’s much more convenient and less confusing to think of async def as a single keyword that happens to have a space in the middle of it. It certainly behaves like one in terms of language usage.

So for example:

async def example_coroutine_function(a, b):
    # Asynchronous code goes here
    ...

def example_function(a, b):
    # Synchronous code goes here
    ...

In the above example we define a coroutine function example_coroutine_function and an ordinary function example_function. The code block that forms the body of the definition is slightly different in the two cases. The code block for example_function is ordinary synchronous Python, whilst the code-block for example_coroutine_function is asynchronous Python.

IMPORTANT ICON IMPORTANT!:

A declaration of a coroutine function using async def looks deceptively similar to the declaration of an ordinary function using def. Most of the time writing one is pretty similar, however there are some key differences, which are very important for asynchronous programming:

TERMINOLOGY ICON TERMINOLOGY: It’s pretty common for people to be sloppy in their terminology and use the word “coroutine” to refer to any of three things:

In this series I will try to keep it clear which of these I’m talking about at any particular point. In particular I will usually say “coroutine object” for an object of class Coroutine, and “coroutine function” for the callable that returns it. When I need to refer to the code block specifically (which is not often) I will refer to it as a “code block inside an async def statement which defines a coroutine function”.

TYPING NOTE ICON TYPING NOTE: If you are using the typing library then the declaration of coroutine functions can be a little confusing at times.

async def example_coroutine_function(a: A, b: B) -> C:
  ...

defines example_coroutine_function as a callable that takes two parameters of types A and B and returns an object of type Coroutine[Any, Any, C]. It’s pretty rare that you’ll need to refer to this return type explicitly.

If you’re curious about the two Any type parameters in the above definition they’re related to the way that the event loop works. The first type parameter actually indicates the type of the values that the coroutine will pass to the event loop whenever it yields, whilst the second represents the type of the values that the event loop will pass back to the coroutine whenever the it is reawakened. In practice the actual types of these objects are determined by the internal machinery of the event loop’s implementation, and should never need to be referred to explicitly in client code unless you are writing your own event loop implementation (which is a pretty advanced topic way beyond the scope of these articles).

The await Keyword and Awaitables

One of the new keywords added to the language to support asyncio is await. This keyword is, in many ways, the very core of asynchronous code. It can only be used inside asynchronous code blocks (ie. in the code block of an async def statement defining a coroutine function), and it is used as an expression which takes a single parameter and returns a value.

Eg.

    r = await a

is a valid Python statement which will perform the await action on the object a and return a value which will be assigned to r. Exactly what will happen when this await statement is executed will depend upon what the object a is.

A coroutine object is “awaitable” (it can be used in an await statement). Recall that when you are executing asynchronous code you are always doing so in the context of a “Task”, which is an object maintained by the Event Loop, and that each Task has its own call stack. The first time a Coroutine object is awaited the code block inside its definition is executed in the current Task, with its new code context added to the top of the call stack for this Task, just like a normal function call. When the code block reaches its end (or otherwise returns) then execution moves back to the await statement that called it. The return value of the await statement is the value returned by the code block. If a Coroutine object is awaited a second time this raises an exception. In this way you can think of awaiting a Coroutine object as being very much like calling a function, with the notable difference that the Coroutine object’s code block can contain asynchronous code, and so can pause the current task during running, which a function’s code block cannot.

In fact there are three types of objects that are awaitable:

That last one is there so that writers of libraries can create their own new classes of objects which are awaitable and do something special when awaited. It’s usually a good idea to make your custom awaitable objects either behave like a Coroutine object or like a Future object, and document which in the class’s doc strings. Making custom awaitable classes like this is a somewhat more advanced topic, though one that may come up when writing asyncio wrappers for synchronous io libraries, for example.

TYPING NOTE ICON TYPING NOTE: If you are using typing then there is an abstract class Awaitable which is generic, so that Awaitable[R] for some type R means “anything which is awaitable, and when used in an await statement will return something of type R”.

One of the most important points to get across is that the currently executing Task cannot be paused by any means other than awaiting a future (or a custom awaitable object that behaves like one). And that is something which can only happen inside asynchronous code. So any await statement might cause your current task to pause, but is not guaranteed to. Conversely any statement which is not an await statement (or an async for or async with under certain circumstances which will be explained in the next post) cannot cause your current Task to be paused.

This means that the traditional multithreaded code problems of data races where different threads of execution both alter the same value are severely reduced in asynchronous code, but not entirely eliminated. In particular for the purposes of data shared between Tasks on the same event loop all synchronous code can be considered “atomic”. To illustrate what this means consider the following code:

import asyncio

async def get_some_values_from_io():
    # Some IO code which returns a list of values
    ...

vals = []

async def fetcher():
    while True:
        io_vals = await get_some_values_from_io()

        for val in io_vals:
            vals.append(io_vals)

async def monitor():
    while True:
        print (len(vals))

        await asyncio.sleep(1)

async def main():
    t1 = asyncio.create_task(fetcher())
    t2 = asyncio.create_task(monitor())
    await asyncio.gather(t1, t2)

asyncio.run(main())

then even though both fetcher and monitor access the global variable vals they do so in two tasks that are running in the same event loop. For this reason it is not possible for the print statement in monitor to run unless fetcher is currently asleep waiting for io. This means that it is not possible for the length of vals to be printed whilst the for loop is only part-way through running. So if the get_some_values_from_io always returns 10 values at a time (for example) then the printed length of vals will always be a multiple of ten. It is simply not possible for the print statement to execute at a time when vals has a non-multiple of ten length.

On the other hand if there was an await statement inside the for loop this would no longer be guaranteed.

NOTE ICON NOTE: Note that the create_task calls above are redundant. The body of main could be reduced to await asyncio.gather(fetcher(), monitor()).

Futures

A Future object is a type of awaitable. Unlike a coroutine object when a future is awaited it does not cause a block of code to be executed. Instead a future object can be thought of as representing some process that is ongoing elsewhere and which may or may not yet be finished.

When you await a future the following happens:

All Future objects f have the following synchronous interface in addition to being awaitable:

It’s important to note that there is no way for a future that is done to ever change back into one that is not yet done. A future becoming done is a one-time occurrence.

IMPORTANT ICON IMPORTANT!: The distinction between a Coroutine and a Future is important. A Coroutine’s code will not be executed until it is awaited. A future represents something that is executing anyway, and simply allows your code to wait for it to finish, check if it has finished, and fetch the result if it has.

IMPORTANT ICON IMPORTANT!: Objects which implement the __await__ magic method may do almost anything when awaited. They might behave more like Coroutines, or more like Futures. They may do something else entirely. The documentation for the class in question should usually make it clear what their behaviour is.

You probably won’t create your own futures very often unless you are implementing new libraries that extend asyncio. However you will find that library functions often return futures. If you do need to create your own future directly you can do it with a call to

f = asyncio.get_running_loop().create_future()

On the other hand you will probably find that you use a related method, create_task quite often …

TYPING NOTE ICON TYPING NOTE: If you want to specify that a variable is a Future then you can use the asyncio.Future class as a type annotation. If you want to specify that the Future’s result should be of a specific type, R then you can use the following notation:

f: asyncio.Future[R]

(in Python 3.6 you will need to wrap asyncio.Future[R] in quotes for this to work correctly, but in later versions of Python this is no longer needed).

Tasks

As described in the previous article each event loop contains a number of tasks, and every coroutine that is executing is doing so inside a task. So the question of how to create a task seems like an important one.

Creating a task is a simple matter, and can be done entirely in synchronous code:

async def example_coroutine_function():
    ...

t = asyncio.create_task(example_coroutine_function())

NOTE ICON NOTE: In Python 3.6 the function asyncio.create_task is not available, but you can still create a task using:

t = asyncio.get_event_loop().create_task(example_coroutine_function())

this is exactly the same, but a little more verbose.

The method create_task takes a coroutine object as a parameter and returns a Task object, which inherits from asyncio.Future. The call creates the task inside the event loop for the current thread, and starts the task executing at the beginning of the coroutine’s code-block. The returned future will be marked as done() only when the task has finished execution. As you might expect the return value of the coroutine’s code block is the result() which will be stored in the future object when it is finished (and if it raises then the exception will be caught and stored in the future).

Creating a task to wrap a coroutine is a synchronous call, so it can be done anywhere, including inside synchronous or asynchronous code. If you do it in asynchronous code then the event loop is already running (since it is currently executing your asynchronous code), and when it next gets the opportunity (ie. next time your current task pauses) it might make the new task active.

When you do it in synchronous code, however, chances are that the event loop is not yet running. Manualy manipulating event loops is discouranged by the python documentation. Unless you are developing libraries extending asyncio functionality, you should probably avoid trying to create a task from synchronous code.

If you do need to call a single piece of async code in an otherwise synchronous script, you can use asyncio.run().

Running async programs

With the introduction of asyncio.run() in Python 3.7, and the removal of the loop parameter from many asyncio function in Python 3.10, managing event loops is something that you are unlikely to come across, unless you are developing an async library. The event loop objects are still there and accessible. There is a whole page in the docs discussing them. If you are working in Python 3.7 or greater, rejoice and give thanks for asyncio.run().

asyncio.run(coro) will run coro, and return the result. It will always start a new event loop, and it cannot be called when the event loop is already running. This leads to a couple of obvious ways to run your async code.

The first is to have everything in async coroutines, and have a very simple entry function:

import asyncio

async def get_data_from_io():
    ...

async def process_data(data):
    ...

async def main():
    while true:
        data = await get_data_from_io()
        await process_data(data)

asyncio.run(main())

The second is to wrap each coroutine call in a separate run command. Note that this forgoes all of the benefits of asyncio. Still, there might be the odd script where this is the right thing to do.

import asyncio

async def get_data_from_io():
    ...

async def process_data(data):
    ...

def main():
    while true:
        data = asyncio.run(get_data_from_io())
        asyncio.run(process_data(data))

main()

Note that these simple examples don’t make use of the ability of async code to work on multiple tasks concurrently. A more sensible example is given at the end. As you work with asyncio in python, you’ll learn about more sophisticated ways to manage your work, but this is enough to get you started.

Manual event loop interaction

If you’re using Python 3.6, and you need to run coroutines from ordinary sync code (which you probably will, if you want to start something.) then you will need to start the event loop. There are two methods for doing this:

asyncio.get_event_loop().run_forever()

will cause the event loop to run forever (or until explicitly killed). This isn’t usually particularly useful. Much more useful is:

r = asyncio.get_event_loop().run_until_complete(f)

which takes a single parameter. If the parameter is a future (such as a task) then the loop will be run until the future is done, returning its result or raising its exception. So putting it together:

async def example_coroutine_function():
    ...

loop = asyncio.get_event_loop()
t = loop.create_task(example_coroutine_function())
r = loop.run_until_complete(t)

will create a new task which executes example_coroutine_function inside the event loop until it finishes, and then return the result.

In fact this can be simplified further since if you pass a coroutine object as the parameter to run_until_complete then it automatically calls create_task for you.

How to yield control

There is no simple command for yielding control to the event loop so that other tasks can run. In most cases in an asyncio program this is not something you will want to do explicitly, preferring to allow control to be yielded automatically when you await a future returned by some underlying library that handles some type of IO.

However occasionally you do need to, and in particular it’s quite useful during testing and debugging. As a result there is a recognised idiom for doing this if you need to. The statement:

await asyncio.sleep(0)

will pause the current task and allow other tasks to be executed. The way this works is by using the function asyncio.sleep which is provided by the asyncio library. This function takes a single parameter which is a number of seconds, and returns a future which is not marked done yet but which will be when the specified number of seconds have passed.

Specifying a count of zero seconds works to interrupt the current task if other tasks are pending, but otherwise doesn’t do anything since the sleep time is zero.

The implementation of asyncio.sleep in the standard library has been optimised to make this an efficient operation.

When using asyncio.sleep with a non-zero parameter it’s worth noting that just because the future will become done when the number of seconds has passed does not mean that your task will always wake back up at that time. In fact it may wake back up at any point after that time, since it can only awaken when there’s no other task being run on the event loop.

Summary

A diagram summarising the types of awaitable objects and their relationships. Coroutine inherits from Awaitable. Future also inherits from Awaitable. Task inherits from Future.

Making an actual program

So that concludes our run down of the basic syntax for writing asynchronous code. With just this you can already create a perfectly good async program which can instantiate multiple tasks and allow them to be swapped in and out. The following example is a fully working Python program using only the things included in this post:

import asyncio

async def counter(name: str):
    for i in range(0, 100):
        print(f"{name}: {i!s}")
        await asyncio.sleep(0)

async def main():
    tasks = []
    for n in range(0, 4):
        tasks.append(asyncio.create_task(counter(f"task{n}")))

    while True:
        tasks = [t for t in tasks if not t.done()]
        if len(tasks) == 0:
            return

        await tasks[0]

asyncio.run(main())

This program will run four tasks which print the numbers from 0 to 99, and after printing each task will yield control to allow other tasks to take over. It neatly demonstrates that asyncio allows multiple things to be done interleaved.

To actually do anything useful you’ll need to make use of one of the libraries that implement io, such as aiohttp, and when you do you might well find that there are a few things in their interfaces which I haven’t covered in this post. Specifically you’ll probably find that the interface makes use of async with and possibly also async for. So those will be the subject of the next post in this series: Python Asyncio Part 3 – Asynchronous Context Managers and Asynchronous Iterators