cloudfit-public-docs

Python Asyncio Part 5 – Mixing Synchronous and Asynchronous Code

In the previous parts of this series on Python asyncio I have introduced you to the basic concepts, basic syntax, a couple of useful more more advanced features, and some helpful libraries. In this part I intend to delve back into the details of the interfaces you may well find yourself needing to use if you need to combine synchronous and asynchronous code in the same project. This is something we have come across several times in BBC R&D’s cloudfit project, especially when making use of existing libraries which do not yet support asyncio directly.

The one thing you can’t do, and the one thing you really shouldn’t

You can’t run async code unless you have an event loop to run it in (without implementing some special work-around involving an object that fakes being an event loop, anyway). So awaiting asynchronous coroutines in a program that doesn’t have a running event loop means you need to start an event loop. Luckily this doesn’t come up much.

Conversely you absolutely can call non-async code from async-code, in fact it’s easy to do so. But if a method/function call might “block” (ie. take a long time before it returns) then you really shouldn’t.

The problem with this can neatly be seen in the following code:

import requests
import asyncio
import time

async def counter():
    now = time.time()
    print("Started counter")
    for i in range(0, 10):
        last = now
        await asyncio.sleep(0.001)
        now = time.time()
        print(f"{i}: Was asleep for {now - last}s")

async def main():
    t = asyncio.get_event_loop().create_task(counter())

    await asyncio.sleep(0)

    print("Sending HTTP request")
    r = requests.get('http://example.com')
    print(f"Got HTTP response with status {r.status_code}")

    await t

asyncio.get_event_loop().run_until_complete(main())

When this code is run the output looks something like this:

Started counter
Sending HTTP request
Got HTTP response with status 200
0: Was asleep for 0.019963502883911133s
1: Was asleep for 0.0012884140014648438s
2: Was asleep for 0.0012254714965820312s
3: Was asleep for 0.0011649131774902344s
4: Was asleep for 0.0011239051818847656s
5: Was asleep for 0.0012202262878417969s
6: Was asleep for 0.0012269020080566406s
7: Was asleep for 0.001184701919555664s
8: Was asleep for 0.0011556148529052734s
9: Was asleep for 0.00115203857421875s

As you can see the counter has paused during the whole time it takes to make the HTTP request. This is because the call requests.get is an ordinary synchronous IO call, and does not return until the http request has been completed. Since it is not asynchronous code the event loop can do nothing to interrupt it to let other tasks run, and so it “blocks the event loop” for as long as it runs.

Naturally this is a problem.

NOTE ICON NOTE: As well as synchronous (ie. blocking) IO there’s another class of Python methods which can easily block the event loop if you run them from async code: CPU-bound processes. A CPU-bound process is any piece of code which performs a lot of calculations, or otherwise occupies all the time on the CPU whilst it is running for an extended period of time. Some good examples of CPU bound processes would be training a neural network, compressing a raw video to H.264, or performing a series of fourier transforms on a large data set.

There are a number of techniques you can use to mitigate, and in some cases entirely eliminate, this problem, however.

Executors and Multithreading

Asyncio is fundamentally a single-threaded technology. Each event loop runs on a single thread, and multiplexes the thread’s runtime amongst different tasks. This can be a very efficient model of operation when you have an IO-bound task that is implemented using an asyncio-aware io library. Sadly not all tasks are IO-bound and not all libraries support asyncio.

This is where multithreading comes in. Just because your event loop runs on a single thread and multiplexes its use doesn’t mean that your program can’t also have other threads running at the same time. In fact it’s very useful to have a pool of threads available so that you can submit long-running blocking work to them and allow those long-running blocking calls to each occupy a thread of their own whilst they run.

Since this is such a useful pattern it should be no surprise that the asyncio library provides a way of doing this easily, using the event loop’s run_in_executor method. Thus:

import requests
import asyncio
import time

async def counter():
    now = time.time()
    print("Started counter")
    for i in range(0, 10):
        last = now
        await asyncio.sleep(0.001)
        now = time.time()
        print(f"{i}: Was asleep for {now - last}s")

async def main():
    t = asyncio.get_event_loop().create_task(counter())

    await asyncio.sleep(0)

    def send_request():
        print("Sending HTTP request")
        r = requests.get('http://example.com')
        print(f"Got HTTP response with status {r.status_code}")

    await asyncio.get_event_loop().run_in_executor(send_request)

    await t

asyncio.get_event_loop().run_until_complete(main())

produces output like this:

Started counter
Sending HTTP request
0: Was asleep for 0.0016489028930664062s
1: Was asleep for 0.0019485950469970703s
2: Was asleep for 0.0011708736419677734s
3: Was asleep for 0.00118255615234375s
4: Was asleep for 0.001283884048461914s
5: Was asleep for 0.001234292984008789s
6: Was asleep for 0.0011649131774902344s
7: Was asleep for 0.0012319087982177734s
8: Was asleep for 0.001219034194946289s
9: Was asleep for 0.001234292984008789s
Got HTTP response with status 200

and as you can see this time the counter was not blocked by the HTTP request.

So how did that work? The method run_in_executor takes two or more arguments, and returns a future. The first argument can be used to specify which thread pool or process pool to use for running the code, but if you set it to None then it will use a default thread pool owned by the event loop, and this is almost always going to be what you want. The second parameter is a synchronous callable which is to be run in a thread. Any further parameters are passed through to the callable when it is called.

The returned future will be marked as done when the supplied callable has finished executing on its thread. If it returned a value that will be stored in the future as the result, if it raised an exception that will be stored in the future as its exception. This means that for many purposes you can treat the future returned by run_in_executor very much like the future returned by create_task, but with the knowledge that the tasks created with create_task will be multiplexed on the event loop’s thread, whilst those created by run_in_executor will be run on another thread entirely, and will begin executing almost immediately.

Wrapping blocking calls up to run on other threads using run_in_executor is probably the simplest and easiest way to make use of libraries not intended for asyncio usage when writing an asyncio program. There are certainly potential pitfalls (especially if the underlying library is not guaranteed thread safe), but on the whole a lot of code will make use of this technique, and you can even use it to create asyncio-like wrappers around synchronous code in a wide variety of circumstances. It’s particularly useful for CPU-bound code.

However there are some other techniques that you might also benefit from having in your toolbox, though they are likely to be used less often.

NOTE ICON NOTE: Some versions of Python have in the past erroneously annotated run_in_executor as being a coroutine method. This has never actually been the case, and all actual implementations of it have always been methods that returned futures. As of August 2020 this error has been corrected in the Python standard library and in typeshed, but if you have older versions you may see incorrect type annotations/documentation.

What about the Global Interpreter Lock?

You may have heard of Python’s Global Interpreter Lock, and how it prevents python from really running proper multithreaded code in a parallel fashion. This interacts with run_in_executor, but is not as bad as it might first appear.

The Python interpreter’s Global Interpreter Lock is a mutex which is always held by any thread that is currently interpreting Python instructions within a single process. As a result it’s usually not possible for two Python threads to be actually running python code simultaneously, though they can switch back and forth as often as between individual instructions.

However, if a Python method calls out to native code for some purpose then the GIL will normally be released. Hence multiple threads can be running simultaneously if all but one of them are currently running native code.

The native code that implements blocking IO in Python releases the GIL during its blocking period; this means that IO-bound tasks using blocking IO do not lock up other python threads whilst they block.

For CPU-bound tasks this can be more of an issue. Many libraries for performing CPU-bound operations call out to native code to do their heavy processing, and release the GIL so that other threads will not be blocked. However if you need to run a CPU-bound task written in pure python in the background whilst your other code is running then problems can arise. Tasks in other threads will run slower, experiencing frequent pauses as the interpreter switches back and forth between threads.

To work around this you can construct an object of class concurrent.futures.ProcessPoolExecutor and pass it as the first parameter of run_in_executor, instead of None. This will cause your code to be run not in another thread, but in another process entirely, and thus the GIL will not be shared.

Going the other way with run_coroutine_threadsafe

Very occasionally you might be in the situation of running synchronous code on its own thread which needs to call asynchronous code on the event loop’s thread. In this specific instance the event loop provides a synchronous method run_coroutine_threadsafe which can be used to do what you want.

WARNING ICON WARNING: This technique is only useful when you are inside a synchronous method which is running on one thread but you know that there is also an event loop running on another thread. You can’t use it if the synchronous method you are in is running on the event loop’s thread, and you can’t use it if no event loop is running.

The method run_coroutine_threadsafe takes two parameters. The first should be a coroutine object (not a callable coroutine method, a future, or anything else, just a coroutine object), and the second should be the event loop object itself.

The call creates a new task (using create_task) which wraps the given coroutine. It then wraps it up inside an object of class concurrent.futures.Future which is quite similar to the asyncio.Future object, but intended for use in multithreaded environments.

This returned object has the done, result, and exception methods much like the futures we’ve discussed before, however result and exception behave subtly differently. Specifically if called when the future is not yet done they do not raise an exception. Instead they block the current thread and do not return until the future is done. In many ways you can think of a call to result() on a concurrent.futures.Future as being analogous to using an asyncio future in an await statement.

Using non-blocking IO and periodic polling

Sometimes a library will provide an interface which allows “non-blocking IO”. Which is to say that instead of a call to something.read() blocking until there is some data to read it might be possible to call it with parameters which put it in non-blocking mode, so that the call something.read(block=False) will always return immediately, but if there was no data available to read it will indicate this in some fashion (such as raising an exception or returning None). This being the case you could easily create some code like the following:

async def read_async(data_source):
    while True:
        r = data_source.read(block=False)
        if r is not None:
            return r
        else:
            await asyncio.sleep(0.01)

Which would work as a quick and dirty version of an asynchronous read coroutine for the data_source. Specifically when awaited it would periodically check for available data, returning it if there is some, but otherwise going to sleep until the time for the next check.

This is unlikely to be the most efficient way of taking a synchronous interface and making it asynchronous, but it might be simpler than other options in some cases, and perhaps it could be performant enough.

Watching file descriptors

Your final option for using synchronous IO-bound routines in an asynchronous fashion is to make use of the low-level mechanisms provided by the operating system and the event loop for just this purpose. This is what the writers of asyncio libraries will usually be doing, and is likely to be the most efficient result, though also in many cases the most difficult and complicated to write.

If your underlying library can provide a “file descriptor” (an operating system provided object which identifies a resource that can be waited on) then you can make use of that using the event loop’s methods: add_reader and add_writer. These two methods let you specify a synchronous callback method which should be called every time a specific file descriptor indicates that it is available for reading or writing, respectively. Depending on what the underlying mechanism for data transfer is it’s possible that the callback will fire even though there isn’t yet enough data to read (or enough space to writer to), so you do need to check each time it’s called. But nonetheless this allows the creation of a significantly improved version of the read_async coroutine from the previous example:

async def read_async(data_source):
    loop = asyncio.get_running_loop()
    fd = data_source.get_fd()
    fut = loop.create_future()

    def __check_for_read():
        try:
            r = data_source.read(block=False)
        except Exception as e:
            loop.remove_reader(fd)
            fut.set_exception(e)
        else:
            if r is not None:
                loop.remove_reader(fd)
                fut.set_result(r)

    loop.add_reader(fd, __check_for_read)
    return await fut

This version of the coroutine no longer sleeps periodically, instead it creates a future, then adds a callback to the event loop which will be called whenever there is data on the underlying file descriptor to be read, and then awaits the future. Each time the callback is called it tries to read data from the data source, and if it can it sets the future to done with the data as its result, if it can’t it does nothing. If an exception is raised by the call to read the data source then this exception is propagated to the future as well.

If you can manage it this is likely to be a better way of providing an asynchronous wrapper around a synchronous IO-bound process, but it won’t work for CPU-bound processes, or for anything that doesn’t provide you with a file descriptor to work with.

NOTE ICON NOTE: The asyncio library also provides more specific tools for dealing with network sockets making use of Transport and Protocol classes. If you are writing a library which will provide async access to a specific network protocol or converting an existing such library from synchronous to asynchronous functioning then these will be extremely useful to you. Otherwise I would recommend steering clear of them.

Summary

With everything in this part and the preceding parts of this series you should now be able to write code that makes use of asyncio for most development purposes, and also write useful wrappers to allow you to interact with non-asyncio libraries which you have to use. All in all I hope this has been an instructive series of articles, and has helped demystify this portion of Python. I primarily intended these articles to cover the things I wish I had known when I started using this. But if you have any suggestions for topics that should be covered but aren’t feel free to get in touch.