Having already covered the basic concepts in Python Asyncio Part 1 – Basic Concepts and Patterns, and the basic notation in Python Asyncio Part 2 – Awaitables, Tasks, and Futures, in this part of the series I will be going into detail on two additional features provided by asyncio which are widely used in library interfaces, and hence are really needed to make proper use of the technology. Many of the examples used here are based on code we have actually used as part of BBC R&D’s cloudfit project.
If you’re an experienced Python programmer you might well have used context managers a fair bit and written context managers to make your own code cleaner and easier. But if you haven’t then the Pythontips book has a good description of what they are and how they work.
Asynchronous context managers are, fairly logically, an extension of the concept of context managers to work in an asynchronous environment, and you will find that they are used a lot in asyncio-based library interfaces.
An asynchronous context manager is an object which can be used in an async with
statement. An example of this is shown below:
async with FlowProvider(store_url) as provider:
async with provider.open_read(flow_id, config=config) as reader:
frames = await reader.read(720, count=480)
# Do other things using reader
...
# Do other things using provider
...
# Do something with frames
...
In the above example the method FlowProvider
returns an asynchronous context manager, and so does provider.open_read
. Conceptually this code is relatively easy to follow:
FlowProvider
and the result is bound to the name provider
.provider.open_read
and the result bound to the name reader
.reader
is available to use, so we can await reader.read
which is a coroutine returning a list of frames.reader
are performed.async with
statement has completed some tidy-up and resource deallocation is performed for reader
.provider
are performed.async with
statement has completed some tidy-ip and resource deallocation is performed for provider
reader
and provider
have both had their clean up done, but variables like frames
are still accessible and hold their values.NOTE: Actually
provider
andreader
continue to refer to the objects they refer to even after the end of the code-blocks, but there’s no guarantee that these objects will be usable for anything after their context has been exited. Use with care.
This is essentially the same as the process and use of a normal synchronous context manager using the with
statement. The difference is a simple one:
The setup and teardown performed on entry and exit are performed by awaiting asynchronous coroutines
This small difference is quite important. It means that the code provided in the asynchronous context manager for entry and exit from the context can be asynchronous code (ie. it can contain await
statements), and also that async with
itself can only be used in a context where asynchronous code is allowed (eg. inside the code-block of a coroutine function).
USEFUL: If an object needs particular setup to be performed before it is used and/or particular actions to be taken when its use is over then make it a context manager and use it to wrap the block that uses it.
If the setup or shutdown actions involve IO then make the object an asynchronous context manager so that IO can be performed asynchronously.
Note, however, that the methods FlowProvider
and provider.open_read
are not coroutine methods. They are normal methods that return asynchronous context manager objects. This is normal. It’s pretty rare to see a coroutine returning an asynchronous coroutine.
TYPING NOTE: If you are using the
typing
library then there is an abstract type class provided for asynchronous context managersAsyncContextManager[T]
, whereT
is the type of the object which will be bound by theas
clause of theasync with
statement.
In fact, the async with
statement is really just a shorthand for writing out a more complex block of code involving await
statements:
async with AsyncCM as ctx:
...
# Is the same as:
ctx = await AsyncCM.__aenter__()
try:
...
except Exception as e:
if not await AsyncCM.__aexit__(type(e), e, e.__traceback__):
raise e
else:
await AsyncCM.__aexit__(None, None, None)
As such you can easily define your own asynchronous context managers by creating classes which implement the magic coroutine methods:
async def __aenter__(self):
...
async def __aexit__(self, exc_t, exc_v, exc_tb):
...
The parameters that __aexit__
takes and the return values of these coroutines are defined as follows:
__aenter__
may be anything. Whatever value it returns is the object which will be bound by any as
clause in the async with
statement when it is used.async with
statement reaches its end without an exception then __aexit__
will be called with all three parameters as None
and its return value will be ignored.async with
statement raises an exception then __aexit__
will be called with the type of the exception, the exception object itself, and a “traceback” associated with the exception as the three parameters (the traceback object can be passed to some methods in the traceback
library to format it into a nice string which can be printed to the user indicating where in the code the exception occurred). If it returns True
(or anything that evaluates as truthy) then the system will assume that the exception has been handled and corrected for, and will not propagate it any further. If it returns False
, None
, anything that evaluates as falsy, or nothing at all then the exception will continue to propagate.This behaviour neatly mirrors the magic methods __enter__
and __exit__
which are used when defining synchronous context managers.
WARNING: It is a common pattern in older code (even in the Python main library) to perform IO operations in the
__init__
method of an object. This should really be discouraged because__init__
is synchronous and so can never perform asynchronous IO. A better pattern is to make the object a CM and perform synchronous IO in the__enter__
and__exit__
methods and asynchronous equivalents in the__aenter__
and__aexit__
coroutines.
USEFUL: Although common it is not required that the
__enter__
or__aenter__
methods/coroutines returnself
. In some cases it may be very useful to have the two routines each construct and return some sort of object representing an “open session” on the resource that the whole CM object represents. This allows__enter__
and__aenter__
to return different objects, with slightly different interfaces, for example methods with the same name could be synchronous methods in the object returned by__enter__
and asynchronous coroutines in the object returned by__aenter__
.This allows code that looks like the following:
# Perform some IO operations synchronously with RemoteResource(*some_parameters) as connection: connection.send(some_data) new_data = connection.recv() # Perform the same IO operations asynchronously async with RemoteResource(*some_parameters) as connection: await connection.send(some_data) new_data = await connection.recv()
which can lead to very readable and clear code and easy changing between synchronous and asynchronous code.
USEFUL: In fact there’s an even easier way to define your own asynchronous context managers using the decorator
@asynccontextmanager
. In Python 3.7+ it’s provided bycontextlib
in the standard library. For Python 3.6 you will need to install async_generator from pypi to get access to it. It is used as follows:@asynccontextmanager async def ExampleAsyncCM(a_param, b_param): # Perform setup that would go in __aenter__ ... yield obj # obj should be the object that will be bound in the as clause # Perform teardown that would go in __aexit__ ...
In particular if the code-block of the
async with
statement raises an exception which would normally be passed into the__aexit__
coroutine as parameters then in an ACM defined this way the exception will be raised by theyield
statement.
Iterators and Generators are a common tool in Python. There’s a good description of how they work at Pythontips. Asynchronous Iterators and Asynchronous Generators are natural asynchronous analogues in much the same way that Asynchronous Context Managers are asynchronous analogues to Context Managers.
Abstractly an iterable represents a source of data which can be looped over with a for
loop, and so an async iterable represents a source of data which can be looped over with an async for
loop. Using an async iterable is straightforward:
async for grain in reader.get_grains():
# Do something with each grain object
...
In the above code the method reader.get_grains
returns an asynchronous iterable object, and the loop draws elements from it one by one, assigning each to the local variable grain
within the loop body, much like a normal for loop running over an iterable. The difference is that the method used to extract the next element from the asynchronous iterator derived from the iterable is an asynchronous coroutine method, and its output is awaited.
TERMINOLOGY: An asynchronous iterator is an object from which items can be drawn directly for use in the loop, an asynchronous iterable is an object from which you can get an asynchronous iterator to draw from. In practice all asynchronous iterators are also asynchronous iterables and all asynchronous iterables can be used to make an asynchronous iterator trivially, so it’s rare to need to distinguish them too clearly.
So in fact the async for
construction is a shorthand for a longer piece of code using await
statements:
async for a in async_iterable:
await do_a_thing(a)
# Is equivalent to
it = async_iterable.__aiter__()
while True:
try:
a = await anext(it)
except StopAsyncIteration:
break
await do_a_thing(a)
For this reason much like await
and async with
the async for
loop can only be used in a context where asynchronous code is permitted (such as inside the code block of an asynchronous coroutine function defined with async def
).
Note the use of the anext(async_iterator_object)
notation above. This was introduced in python 3.10, and is analagous to the next(iterator_object)
syntactic shorthand for iterator_object.__next__()
. If you are using Python 3.9 or earlier, you must await async_iterator_object.__anext__()
directly instead.
USEFUL: An Async Iterator might be useful for representing a remote resource which requires some time consuming IO to be performed each time another object is pulled from it.
In fact since coroutines don’t have to pause each time they’re awaited it’s perfectly possible to use an asynchronous iterator to conceal an optimised loading strategy that acts to load resources in the background (by adding tasks to the runloop) and only pauses the current task when an object is needed if that object has not been loaded yet.
Implementing your own async iterables is relatively easy, you just need to implement the magic method:
def __aiter__(self):
...
to return an asynchronous iterator (note that __aiter__
is not a coroutine method). And implementing your own asynchronous iterator is also easy, you just have to create an object which implements the following magic methods:
def __aiter__(self):
return self
async def __anext__(self):
...
where __aiter__
must return self
, and __anext__
should be a coroutine method which will return the next item in the iterator each time it is awaited.
NOTE: Although not strictly required it is common to implement your custom iterable so that each time
__aiter__
is called it returns a new async iterator that starts again at the beginning of the sequence of items to be returned.
TYPING NOTE: If you are using the
typing
library then abstract classesAsyncIterator[T]
andAsyncIterable[T]
are provided to make typing easier.
Still, creating your async iterables by hand is more of a pain than creating synchronous iterables, which you would normally do using a generator. As such it should be no surprise that the Python developers also decided to include an asynchronous analogue to a generator in the form of asynchronous generators.
An async generator can be used as a shorthand method for defining an asynchronous iterator. It actually has a wider usage too which allows you to go beyond what the iterator interface allows, but I’m going to leave that until later (since it is pretty obscure and unlikely to be of much use in most cases).
So for a simple usage an asynchronous generator method is defined using async def
much like how an asynchronous coroutine method is, but with the difference that the body of the method must contain at least one use of the keyword yield
.
async def async_generator_method_example(param):
...
...
yield something
....
...
yield something_else
...
... # etc ...
IMPORTANT!: The only difference between the declaration of an async coroutine method and an async generator method is the absence or presence of
yield
in the code block. Notably there is no difference in the declaration line itself at the start of the method declaration. This can make it hard to spot when something is one or the other. And they are very different in usage. As such I would recommend that if you are using async generators you annotate them clearly to show what they are using comments, doc strings, names, type annotations, or any other method you choose. The language will not help you here.
An async generator method is a synchronous method which returns an async generator object. It is not a coroutine method, and awaiting its return value will only lead to an exception.
async def coroutine_method():
return 3
async def generator_method():
yield 3
# This is correct
r = await coroutine_method()
# This will raise an exception!
r = await generator_method()
However the async generator object returned by the call is an example of an async iterator, so you can use it in an async for
loop:
# This is fine, and will print 3
async for r in generator_method():
print(r)
Particularly for a generator object g
the first time g.__anext__()
is awaited the code in the generator’s code-block will be executed up until it reaches the first yield
statement (or until the code block ends/returns) and the value passed to the yield
will be the value returned by this await, and each subsequent time that g.__anext__()
is awaited the code will continue running from where it left off last time until it gets to the next yield
statement, and that statement’s value will be returned. If the generator method’s code-block reaches a return
statement or the end of the block then this will cause the await of g.__anext__()
to raise StopAsyncIteration
, which as we saw above will be caught by the async for
loop and causes the loop to exit normally.
WARNING: Whilst it is valid to raise
StopAsyncIteration
directly from inside an asynchronous generator doing so is not recommended because it is generally considered to produce hard to follow code. Some linters will treat it as an error. It’s recommended that you use areturn
statement instead. It is a syntax error to pass a value to thereturn
statement in an asynchronous generator.
It’s possible to make more advanced use of an asynchronous generator, but doing so requires moving beyond what is allowed by the async for
loop and the async iterator
interface.
In point of fact each yield
statement inside a generator can be made to return a value as well as taking one. So the following code is valid:
async def advanced_generator(y):
for i in range(0, 10):
x = await do_something(y)
y = yield x
And to make use of this you can’t use the async for
loop, and instead need to be more explicit:
it = advanced_generator(first_y)
x = await anext(it)
while True:
y = await do_something_else(x)
try:
x = await it.asend(y)
except StopAsyncIteration:
break
this code passes values back and forth between the generator and the calling object each time it is called. Specifically it starts by creating the generator with the initial value first_y
as y
. It then awaits __anext__
once, which executes the start of the generator, including awaiting do_something
and then yields the value x
that was returned from it. This value is yielded back to the caller, which assigns it to x
and begins to loop. Each iteration of the loop awaits do_something_else
with the last value the generator yielded back, and then sends the result into the generator where it becomes the return value of the yield
statement.
I have yet to find a good use for this kind of advanced generator that can’t be done more clearly and easily some other way, but the facility is there if you need it.
Sometimes even writing out a generator is too much effort and code, and you want to create an asynchronous iterator in a single line. The Python library provides another shorthand that can be used for this in the form of asynchronous generator comprehensions, which are an asynchronous analogue to the generator comprehensions long provided in the Python library.
The basic form of an asynchronous generator comprehensions is:
it = (<async_expression> async for <variable> in <async_iterable> if <condition>)
this is shorthand for the following code:
async def _gen():
async for <variable> in <async_iterable>:
if <condition>:
yield <async_expression>
it = _gen()
where the if <condition>
clause can be omitted if not needed. As you can see this allows you to take one asynchronous generator and create another from it with a single line of code. What’s more the <async_expression>
, <async_iterable>
, and <condition>
can contain asynchronous code, since they are going to be embedded in the body of an asynchronous generator, but the statement itself does not actually execute any asynchronous code (it merely creates the generator object) and so can be used anywhere, including in synchronous code.
This can be confusing on first view, since it means that code like this:
def sync_method(gen):
# This is a synchronous method
...
it = (
await x.run()
async for x in gen
if not (await x.skip())
)
...
return it
is perfectly valid, even though it appears to embed await
statements inside a synchronous method, where they are usually not allowed.
To add even more confusion there is a second type of asynchronous comprehension which cannot be used in synchronous code: the asynchronous list comprehension. And they look extremely similar.
An asynchronous list comprehension (which again may only appear in a context where asynchronous code is permitted such as in the body of a coroutine method) has the following basic form:
l = [<async_expression> async for <variable> in <async_iterable> if <condition>]
and this is shorthand for the following code:
async def _list():
r = []
async for <variable> in <async_iterable>:
if <condition>:
l.append(<async_expression>)
return r
l = await _list()
which is subtly different from the generator version. In this case the implicit async def
is used to create a coroutine function, which is then called and awaited. Since this await occurs in the context that the comprehension is located in this means that the comprehension can only be used in contexts where await
would be valid.
NOTE: Asynchronous dictionary comprehensions and asynchronous set comprehensions can also be constructed in much the same way. Just like the list comprehensions they must be used only in asynchronous code.
This was a bit of a whistle stop tour of the asynchronous context managers, iterables, iterators, generators, and comprehensions. The main thing to take away from this is that Asynchronous Context Managers are extremely useful, and widely used throughout the interfaces of async libraries, so understanding them and using them properly is very important. Asynchronous iterators and their relatives are less common, but you will still encounter them from time to time, so it’s important to understand that they exist, even if you need to occasionally check back on how they work.
We have now covered the basic tools, language features, and syntax of Python asyncio. The next article in this series is going to delve into the wonderful world of the asyncio library and the various supporting libraries available on pypi which will make your life easier when writing async code, and allow you to do interesting things one at a time (but not in any specific order). It can be found at Python Asyncio Part 4 – Library Support