cloudfit-public-docs

Python Asyncio Part 4 – Library Support

In the first three parts of this series on Python asyncio I have introduced you to the basic concepts, basic syntax, and a couple of useful more more advanced features. In this part I intend to shift focus a little in order to do a quick run down (with some worked examples) of useful libraries which make use of asyncio and can be used in your code to actually do something useful. These are all libraries which we have made use of in BBC R&D’s cloudfit project.

I will be including some elements from the Python standard library here, but for the most part this will be about third-party libraries. I particular I won’t be going into detail about the built in classes for transports and protocols, which I think are a lot more useful for people writing libraries to extend asyncio than they are to those who are planning on writing programmes that use it.

Making HTTP requests with aiohttp

aiohttp is a library which is designed to make interacting with the http protocol via asyncio straightforward. It contains support for both client and server implementations, but I’m going to concentrate on the client side here.

For those who have used the (extremely widely used as of August 2020) Python library requests, which provides a traditional synchronous http client interface, the interface of aiohttp will prove quite familiar, since it is closely modelled on that of the requests library.

To make http requests using aiohttp is pretty straightforward:

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession(trust_env=True) as session:
        async with session.get(
            'https://www.bbc.co.uk/rd/projects/cloud-fit-production'
        ) as resp:
            print(resp.status)
            print(await resp.text())

asyncio.run(main())

The important thing to note is that you need to create a session and enter it as an async context manager. Each individual request is also an async context manager which can be entered, and inside its context data can be read from the request using the members of the response object.

The library has full support for GET, OPTIONS, PUT, POST, HEAD, DELETE, and other http requests, and for setting custom headers, for applying authentication, and for https as well as http. Support for web sockets is also built in, using a pretty straightforward interface:

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession(trust_env=True) as session:
        async with session.ws_connect(
            'wss://echo.websocket.org/'
        ) as ws:
            n = 0
            print(f"Sending 'hello{n}'")
            await ws.send_str(f'hello{n}')
            async for msg in ws:
                print(f"Received '{ msg.data }'")

                print(f"Sending 'hello{n}'")
                await ws.send_str(f'hello{n}')

                n += 1
                if n == 10:
                    return

asyncio.run(main())

(if, like me, you are behind an https proxy you may have to set a keyword argument called proxy on the ws_connect call).

As of August 2020 http requests are likely to be one of the most common ways you’ll want to perform IO. In fact a lot of programs will be making a lot of http requests to acquire data and to submit results, with some sort of intermediate processing as a CPU-bound operation (and I’ll cover how to handle CPU-bound operations in asyncio programs in the next article).

When using aiohttp, and most other IO libraries, it is usually useful to keep a long lived session of some form, usually in the form of an asynchronous context manager which is entered near the start of the program and exited only when the program exits.

Finally, it is worth pointing out aiohttp_retry, which provides an async context manager which will handle the retrying of http requests if you encounter transient errors. In cloudfit we use this library when making very large numbers of http requests, to handle the occasional server errors and network burps that happen when performing large numbers of requests over a long period of time.

Managing async context managers with AsyncExitStack

When I’m writing asyncio programs I find that I frequently need to create async context managers to wrap up the code that provides the context for actions I need to do often. What’s more I often find that amongst the things I want to do on the entry of the custom context manager is enter several other async context managers, and on exit I want to exit them in the reverse order, handling exceptions properly.

When you’ve just got one you can easily write code like this:

class CustomACM(object):
    async def __aenter__(self):
        self.__inner_acm = InnerACM(...)
        self.__inner_acm_ctx = await self.__inner_acm.__aenter__()

        # Perform further setup for this class
        ...

    async def __aexit__(self, exc_t, exc_v, exc_tb):
        if exc_v is None:
            try:
                # Perform custom clean up on success
                ...
            except Exception as e:
                (exc_t, exc_v, exc_tb) = (type(e), e, e.__traceback__)
        else:
            # Perform custom clean up on error
            ...

        return await self.__inner_acm.__aexit__(exc_t, exc_v, exc_tb)

And this will work, but when you start getting multiple context managers you need to enter and exit it quickly becomes more difficult to deal with. This is especially true when you need to work out what to do if one part of the code raises an exception, and when to pass on to a later call to __aexit__ and when not to. It’s a mess.

There is a simple solution to this, of course, in many cases you can replace your ACM definition with something like:

@asynccontextmanager
async def CustomACM():
    async with InnerACM1() as inner1:
        async with InnerACM2() as inner2:
            # Perform custom setup
            ...

            try:
                yield
            except Exception as e:
                # Perform custom clean up on failure
                ...

                raise e
            else:
                # Perform custom clean up on success
                ...

But this won’t work if you need the class to do more than work as an async context manager, or you need any kind of more detailed control over the behaviour of class.

In that case it would be really useful if you could use some sort of class that handles all of this correctly with any number of context managers, and can be used easily.

This is where AsyncExitStack enters. In Python 3.7 this class was added to contextlib in the standard library, in earlier versions you’ll need to install async_exit_stack from PyPi.

What the class does is wrap up multiple context managers (both sync and async) in a single asynchronous context manager, so that you only have to __aexit__ at the end and it will unwind all of the context managers in the correct order with the correct passing of exceptions.

Its use ends up being very simple:

class CustomACM(object):
    def __init__(self):
        self._exit_stack = AsyncExitStack()

    async def __aenter__(self):
        await self._exit_stack.__aenter__()
        self.__inner1 = await self._exit_stack.enter_async_context(InnerACM1())
        self.__inner2 = await self._exit_stack.enter_async_context(InnerACM2())
        self.__inner3 = self._exit_stack.enter_context(InnerCM())
        self.__inner4 = await self._exit_stack.enter_async_context(InnerACM3())
        return self

    async def __aexit__(self, exc_t, exc_v, exc_tb):
        return await self._exit_stack.__aexit__(exc_t, exc_v, exc_tb)

    # Other methods, which can make use of the self.__inner<n> properties
    ...

This will create an asynchronous context manager which will enter a series of asynchronous and synchronous context managers when it is entered, and will provide your own custom methods which can then access these as needed. No additional malarkey is needed with additional code to handle the exception cases, because the exit stack will handle that for you.

NOTE ICON NOTE: If you need to do some extra custom initialisation in addition to entering other CMs/ACMs I’d suggest creating a super-simple ACM to handle this bit for you which can assume that the outer contexts have already been entered (and which can take their returned values as parameters), thus keeping the CustomACM class extremely clean.

Unit testing

The standard Python library unittest module contains support for testing async code as of Python 3.9. (If you are using Python 3.8 or earlier, look up AsyncTest.) Your test class must inherit from the unittest.IsolatedAsyncioTestCase class. This class accepts coroutine functions as test functions. There are asyncSetUp and asyncTearDown coroutine methods which can be used as well as the older setUp and tearDown methods. Eg.:

import unittest

from stuff import do_sync_stuff, do_async_stuff

expected_value = 1

class TestStuff(unittest.IsolatedAsyncioTestCase):
    async def asyncSetUp(self):
        # Setup happens here

    def test_synchronous(self):
        r = do_sync_stuff()
        self.assertEqual(r, expected_value)

    async def test_asynchronous(self):
        r = await do_async_stuff()
        self.assertEqual(r, expected_value)

has two tests, one of which runs synchronously, and the other asynchronously.

unittest.mock contains AsyncMock, which can be used to mock coroutine functions. It is an asynchronous version of MagickMock, and so will respond appropriately to many magic methods.

IMPORTANT ICON IMPORTANT!: The instances of the class AsyncMock are callable mock objects that will behave like coroutine functions, not coroutine objects. This works well for the most common sorts of async code you will likely write most of the time. If you need to mock more complex patterns involving passing around coroutine objects or futures you may have to define your own mocks starting with MagicMock, but this is not particularly difficult to do.

NOTE ICON NOTE: The Unit Testing Python Asyncio Code post also provides a bit more detail on unit testing async code.

Mocking async Context Managers and Iterators

There are two possible methods for mocking async context managers and iterators - auto-speccing, and implementing the magic methods yourself.

Auto-speccing is described in the getting started pages on mocking in unittest. You provide a class which has the functions you need to use, and pass that as an argument to the mock. The auto-speccing process will detect whether methods on that class are coroutine functions or ordinary functions, and provide MagicMock or AsyncMocks in response to calls as appropriate.

NOTE ICON NOTE: The examples given at the gettign started link above are useful, but a bit magical. The mock async context manager demonstrates autospeccing nicely, but the mock async generator example shouldn’t work. There seems to be some extra magic at work when you set mock.__aiter__.return_value.

If auto-speccing isn’t appropriate for your use-case, you can also mock async generators and context managers by creating the appropriate magic methods. If you do this, take care that you use MagicMock where the function is not a coroutine and AsyncMock where it is. The material in Part 3 tells you which are which.

Summary

This is obviously only scratching the surface of the available asyncio support libraries, but I hope that with access to an http client, a testing library, and a utility library for simplifying your coding you’ve got a good covering of the types of libraries that are available, and how they are usually implemented. If you need to access some IO protocol other than http (eg. a connection to an SQL database, for example) you will usually find that similar libraries exist for that use case.

On the other hand maybe you can’t use the existing libraries, or none exist. In that case you may have to implement your own wrapper around an existing synchronous Python driver for some protocol. To that end the next article will cover how to cope with blocking IO and CPU-bound processes in asyncio: Python Asyncio Part 5 – Mixing Synchronous and Asynchronous Code.