cloudfit-public-docs

Python Asyncio Part 3 – Asynchronous Context Managers and Asynchronous Iterators

Having already covered the basic concepts in Python Asyncio Part 1 – Basic Concepts and Patterns, and the basic notation in Python Asyncio Part 2 – Awaitables, Tasks, and Futures, in this part of the series I will be going into detail on two additional features provided by asyncio which are widely used in library interfaces, and hence are really needed to make proper use of the technology. Many of the examples used here are based on code we have actually used as part of BBC R&D’s cloudfit project.

Asynchronous Context Managers

If you’re an experienced Python programmer you might well have used context managers a fair bit and written context managers to make your own code cleaner and easier. But if you haven’t then the Pythontips book has a good description of what they are and how they work.

Asynchronous context managers are, fairly logically, an extension of the concept of context managers to work in an asynchronous environment, and you will find that they are used a lot in asyncio-based library interfaces.

An asynchronous context manager is an object which can be used in an async with statement. An example of this is shown below:

async with FlowProvider(store_url) as provider:
    async with provider.open_read(flow_id, config=config) as reader:
        frames = await reader.read(720, count=480)

        # Do other things using reader
        ...

    # Do other things using provider
    ...

# Do something with frames
...

In the above example the method FlowProvider returns an asynchronous context manager, and so does provider.open_read. Conceptually this code is relatively easy to follow:

NOTE ICON NOTE: Actually provider and reader continue to refer to the objects they refer to even after the end of the code-blocks, but there’s no guarantee that these objects will be usable for anything after their context has been exited. Use with care.

This is essentially the same as the process and use of a normal synchronous context manager using the with statement. The difference is a simple one:

The setup and teardown performed on entry and exit are performed by awaiting asynchronous coroutines

This small difference is quite important. It means that the code provided in the asynchronous context manager for entry and exit from the context can be asynchronous code (ie. it can contain await statements), and also that async with itself can only be used in a context where asynchronous code is allowed (eg. inside the code-block of a coroutine function).

USEFUL ICON USEFUL: If an object needs particular setup to be performed before it is used and/or particular actions to be taken when its use is over then make it a context manager and use it to wrap the block that uses it.

If the setup or shutdown actions involve IO then make the object an asynchronous context manager so that IO can be performed asynchronously.

Note, however, that the methods FlowProvider and provider.open_read are not coroutine methods. They are normal methods that return asynchronous context manager objects. This is normal. It’s pretty rare to see a coroutine returning an asynchronous coroutine.

TYPING NOTE ICON TYPING NOTE: If you are using the typing library then there is an abstract type class provided for asynchronous context managers AsyncContextManager[T], where T is the type of the object which will be bound by the as clause of the async with statement.

In fact, the async with statement is really just a shorthand for writing out a more complex block of code involving await statements:

async with AsyncCM as ctx:
    ...

# Is the same as:

ctx = await AsyncCM.__aenter__()
try:
    ...
except Exception as e:
    if not await AsyncCM.__aexit__(type(e), e, e.__traceback__):
        raise e
else:
    await AsyncCM.__aexit__(None, None, None)

As such you can easily define your own asynchronous context managers by creating classes which implement the magic coroutine methods:

async def __aenter__(self):
    ...

async def __aexit__(self, exc_t, exc_v, exc_tb):
    ...

The parameters that __aexit__ takes and the return values of these coroutines are defined as follows:

This behaviour neatly mirrors the magic methods __enter__ and __exit__ which are used when defining synchronous context managers.

WARNING ICON WARNING: It is a common pattern in older code (even in the Python main library) to perform IO operations in the __init__ method of an object. This should really be discouraged because __init__ is synchronous and so can never perform asynchronous IO. A better pattern is to make the object a CM and perform synchronous IO in the __enter__ and __exit__ methods and asynchronous equivalents in the __aenter__ and __aexit__ coroutines.

USEFUL ICON USEFUL: Although common it is not required that the __enter__ or __aenter__ methods/coroutines return self. In some cases it may be very useful to have the two routines each construct and return some sort of object representing an “open session” on the resource that the whole CM object represents. This allows __enter__ and __aenter__ to return different objects, with slightly different interfaces, for example methods with the same name could be synchronous methods in the object returned by __enter__ and asynchronous coroutines in the object returned by __aenter__.

This allows code that looks like the following:

# Perform some IO operations synchronously
with RemoteResource(*some_parameters) as connection:
    connection.send(some_data)
    new_data = connection.recv()

# Perform the same IO operations asynchronously
async with RemoteResource(*some_parameters) as connection:
    await connection.send(some_data)
    new_data = await connection.recv()

which can lead to very readable and clear code and easy changing between synchronous and asynchronous code.

USEFUL ICON USEFUL: In fact there’s an even easier way to define your own asynchronous context managers using the decorator @asynccontextmanager. In Python 3.7+ it’s provided by contextlib in the standard library. For Python 3.6 you will need to install async_generator from pypi to get access to it. It is used as follows:

@asynccontextmanager
async def ExampleAsyncCM(a_param, b_param):
    # Perform setup that would go in __aenter__
    ...

    yield obj  # obj should be the object that will be bound in the as clause

    # Perform teardown that would go in __aexit__
    ...

In particular if the code-block of the async with statement raises an exception which would normally be passed into the __aexit__ coroutine as parameters then in an ACM defined this way the exception will be raised by the yield statement.

Asynchronous Iterators

Iterators and Generators are a common tool in Python. There’s a good description of how they work at Pythontips. Asynchronous Iterators and Asynchronous Generators are natural asynchronous analogues in much the same way that Asynchronous Context Managers are asynchronous analogues to Context Managers.

Abstractly an iterable represents a source of data which can be looped over with a for loop, and so an async iterable represents a source of data which can be looped over with an async for loop. Using an async iterable is straightforward:

async for grain in reader.get_grains():
    # Do something with each grain object
    ...

In the above code the method reader.get_grains returns an asynchronous iterable object, and the loop draws elements from it one by one, assigning each to the local variable grain within the loop body, much like a normal for loop running over an iterable. The difference is that the method used to extract the next element from the asynchronous iterator derived from the iterable is an asynchronous coroutine method, and its output is awaited.

TERMINOLOGY ICON TERMINOLOGY: An asynchronous iterator is an object from which items can be drawn directly for use in the loop, an asynchronous iterable is an object from which you can get an asynchronous iterator to draw from. In practice all asynchronous iterators are also asynchronous iterables and all asynchronous iterables can be used to make an asynchronous iterator trivially, so it’s rare to need to distinguish them too clearly.

So in fact the async for construction is a shorthand for a longer piece of code using await statements:

async for a in async_iterable:
    await do_a_thing(a)

# Is equivalent to

it = async_iterable.__aiter__()
while True:
    try:
        a = await anext(it)
    except StopAsyncIteration:
        break

    await do_a_thing(a)

For this reason much like await and async with the async for loop can only be used in a context where asynchronous code is permitted (such as inside the code block of an asynchronous coroutine function defined with async def).

Note the use of the anext(async_iterator_object) notation above. This was introduced in python 3.10, and is analagous to the next(iterator_object) syntactic shorthand for iterator_object.__next__(). If you are using Python 3.9 or earlier, you must await async_iterator_object.__anext__() directly instead.

USEFUL ICON USEFUL: An Async Iterator might be useful for representing a remote resource which requires some time consuming IO to be performed each time another object is pulled from it.

In fact since coroutines don’t have to pause each time they’re awaited it’s perfectly possible to use an asynchronous iterator to conceal an optimised loading strategy that acts to load resources in the background (by adding tasks to the runloop) and only pauses the current task when an object is needed if that object has not been loaded yet.

Implementing your own async iterables is relatively easy, you just need to implement the magic method:

def __aiter__(self):
    ...

to return an asynchronous iterator (note that __aiter__ is not a coroutine method). And implementing your own asynchronous iterator is also easy, you just have to create an object which implements the following magic methods:

def __aiter__(self):
    return self

async def __anext__(self):
    ...

where __aiter__ must return self, and __anext__ should be a coroutine method which will return the next item in the iterator each time it is awaited.

NOTE ICON NOTE: Although not strictly required it is common to implement your custom iterable so that each time __aiter__ is called it returns a new async iterator that starts again at the beginning of the sequence of items to be returned.

TYPING NOTE ICON TYPING NOTE: If you are using the typing library then abstract classes AsyncIterator[T] and AsyncIterable[T] are provided to make typing easier.

Still, creating your async iterables by hand is more of a pain than creating synchronous iterables, which you would normally do using a generator. As such it should be no surprise that the Python developers also decided to include an asynchronous analogue to a generator in the form of asynchronous generators.

Async Generators

An async generator can be used as a shorthand method for defining an asynchronous iterator. It actually has a wider usage too which allows you to go beyond what the iterator interface allows, but I’m going to leave that until later (since it is pretty obscure and unlikely to be of much use in most cases).

So for a simple usage an asynchronous generator method is defined using async def much like how an asynchronous coroutine method is, but with the difference that the body of the method must contain at least one use of the keyword yield.

async def async_generator_method_example(param):
    ...
    ...

    yield something

    ....
    ...

    yield something_else

    ...
    ... # etc ...

IMPORTANT ICON IMPORTANT!: The only difference between the declaration of an async coroutine method and an async generator method is the absence or presence of yield in the code block. Notably there is no difference in the declaration line itself at the start of the method declaration. This can make it hard to spot when something is one or the other. And they are very different in usage. As such I would recommend that if you are using async generators you annotate them clearly to show what they are using comments, doc strings, names, type annotations, or any other method you choose. The language will not help you here.

An async generator method is a synchronous method which returns an async generator object. It is not a coroutine method, and awaiting its return value will only lead to an exception.

async def coroutine_method():
    return 3

async def generator_method():
    yield 3

# This is correct
r = await coroutine_method()

# This will raise an exception!
r = await generator_method()

However the async generator object returned by the call is an example of an async iterator, so you can use it in an async for loop:

# This is fine, and will print 3
async for r in generator_method():
    print(r)

Particularly for a generator object g the first time g.__anext__() is awaited the code in the generator’s code-block will be executed up until it reaches the first yield statement (or until the code block ends/returns) and the value passed to the yield will be the value returned by this await, and each subsequent time that g.__anext__() is awaited the code will continue running from where it left off last time until it gets to the next yield statement, and that statement’s value will be returned. If the generator method’s code-block reaches a return statement or the end of the block then this will cause the await of g.__anext__() to raise StopAsyncIteration, which as we saw above will be caught by the async for loop and causes the loop to exit normally.

WARNING ICON WARNING: Whilst it is valid to raise StopAsyncIteration directly from inside an asynchronous generator doing so is not recommended because it is generally considered to produce hard to follow code. Some linters will treat it as an error. It’s recommended that you use a return statement instead. It is a syntax error to pass a value to the return statement in an asynchronous generator.

Advanced Asynchronous Generators

It’s possible to make more advanced use of an asynchronous generator, but doing so requires moving beyond what is allowed by the async for loop and the async iterator interface.

In point of fact each yield statement inside a generator can be made to return a value as well as taking one. So the following code is valid:

async def advanced_generator(y):
    for i in range(0, 10):
        x = await do_something(y)
        y = yield x

And to make use of this you can’t use the async for loop, and instead need to be more explicit:

it = advanced_generator(first_y)
x = await anext(it)

while True:
    y = await do_something_else(x)
    try:
        x = await it.asend(y)
    except StopAsyncIteration:
        break

this code passes values back and forth between the generator and the calling object each time it is called. Specifically it starts by creating the generator with the initial value first_y as y. It then awaits __anext__ once, which executes the start of the generator, including awaiting do_something and then yields the value x that was returned from it. This value is yielded back to the caller, which assigns it to x and begins to loop. Each iteration of the loop awaits do_something_else with the last value the generator yielded back, and then sends the result into the generator where it becomes the return value of the yield statement.

I have yet to find a good use for this kind of advanced generator that can’t be done more clearly and easily some other way, but the facility is there if you need it.

Asynchronous Comprehensions

Sometimes even writing out a generator is too much effort and code, and you want to create an asynchronous iterator in a single line. The Python library provides another shorthand that can be used for this in the form of asynchronous generator comprehensions, which are an asynchronous analogue to the generator comprehensions long provided in the Python library.

The basic form of an asynchronous generator comprehensions is:

it = (<async_expression> async for <variable> in <async_iterable> if <condition>)

this is shorthand for the following code:

async def _gen():
    async for <variable> in <async_iterable>:
        if <condition>:
            yield <async_expression>

it = _gen()

where the if <condition> clause can be omitted if not needed. As you can see this allows you to take one asynchronous generator and create another from it with a single line of code. What’s more the <async_expression>, <async_iterable>, and <condition> can contain asynchronous code, since they are going to be embedded in the body of an asynchronous generator, but the statement itself does not actually execute any asynchronous code (it merely creates the generator object) and so can be used anywhere, including in synchronous code.

This can be confusing on first view, since it means that code like this:

def sync_method(gen):
    # This is a synchronous method
    ...
    it = (
        await x.run()
        async for x in gen
        if not (await x.skip())
    )
    ...
    return it

is perfectly valid, even though it appears to embed await statements inside a synchronous method, where they are usually not allowed.

To add even more confusion there is a second type of asynchronous comprehension which cannot be used in synchronous code: the asynchronous list comprehension. And they look extremely similar.

An asynchronous list comprehension (which again may only appear in a context where asynchronous code is permitted such as in the body of a coroutine method) has the following basic form:

l = [<async_expression> async for <variable> in <async_iterable> if <condition>]

and this is shorthand for the following code:

async def _list():
    r = []
    async for <variable> in <async_iterable>:
        if <condition>:
            l.append(<async_expression>)
    return r

l = await _list()

which is subtly different from the generator version. In this case the implicit async def is used to create a coroutine function, which is then called and awaited. Since this await occurs in the context that the comprehension is located in this means that the comprehension can only be used in contexts where await would be valid.

NOTE ICON NOTE: Asynchronous dictionary comprehensions and asynchronous set comprehensions can also be constructed in much the same way. Just like the list comprehensions they must be used only in asynchronous code.

Summary

This was a bit of a whistle stop tour of the asynchronous context managers, iterables, iterators, generators, and comprehensions. The main thing to take away from this is that Asynchronous Context Managers are extremely useful, and widely used throughout the interfaces of async libraries, so understanding them and using them properly is very important. Asynchronous iterators and their relatives are less common, but you will still encounter them from time to time, so it’s important to understand that they exist, even if you need to occasionally check back on how they work.

We have now covered the basic tools, language features, and syntax of Python asyncio. The next article in this series is going to delve into the wonderful world of the asyncio library and the various supporting libraries available on pypi which will make your life easier when writing async code, and allow you to do interesting things one at a time (but not in any specific order). It can be found at Python Asyncio Part 4 – Library Support