Ξ

Eight different ways to implement an asyncronous loop in python

Published on 2023-01-29 code python linux

asyncio was first added to the python standard library more than 10 years ago. Asynchronous I/O had already been possible before that, by using libraries such as twisted or gevent. But asyncio was an attempt to bring the community together and standardize on a common solution.

So far this didn't really work out for me. Each time I have to work with asyncio I get frustrated. I find myself longing for the simplicity of callbacks in JavaScript.

But maybe I just don't understand asyncio properly yet. I learn best by trying to recreate the thing I want to learn about. So in this post I will retrace the history of asynchronous programming. I will concentrate on python, but I guess much of this translates to other languages. Hopefully this will allow me to better understand and appreciate what asyncio is doing. And hopefully you will enjoy accompanying me on that journey.

If you are interested, all eight implementations are available on github.

Setup

The following script outputs random numbers at random intervals:

#!/bin/bash
while true; do
    sleep $(($RANDOM % 5))
    echo $RANDOM
done

I will run two instances of that script in parallel:

proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)

The processes will start immediately and run in parallel to our python code. We have to make sure to stop them when the program exits, e.g. because we press Ctrl-C:

def cleanup():
    proc1.terminate()
    proc2.terminate()
    proc1.wait()
    proc2.wait()

We cannot use proc.stdout.readline() because it blocks until a complete line is available. So here is some code that helps us to get the last complete line we received:

class LineReader:
    def __init__(self, file):
        self.file = file
        self.buffer = b''
        self.line = ''

    def read_line(self):
        chunk = os.read(self.file.fileno(), 1024)
        if not chunk:
            raise ValueError
        self.buffer += chunk
        lines = self.buffer.split(b'\n')
        if len(lines) > 1:
            self.line = lines[-2].decode('utf-8')
        self.buffer = lines[-1]

reader1 = LineReader(proc1.stdout)
reader2 = LineReader(proc2.stdout)

What we want to do is to always render the latest complete line from each process as well as the current time:

def render():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(' '.join([now, reader1.line, reader2.line]))

With these basics out of the way, we can start with the loop itself.

Implementation 1: Blocking Loop

Putting it all together we get our first implementation: The blocking loop:

import datetime
import os
import subprocess


class LineReader:
    def __init__(self, file):
        self.file = file
        self.buffer = b''
        self.line = ''

    def read_line(self):
        chunk = os.read(self.file.fileno(), 1024)
        if not chunk:
            raise ValueError
        self.buffer += chunk
        lines = self.buffer.split(b'\n')
        if len(lines) > 1:
            self.line = lines[-2].decode('utf-8')
        self.buffer = lines[-1]


def cleanup():
    proc1.terminate()
    proc2.terminate()
    proc1.wait()
    proc2.wait()


def render():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(' '.join([now, reader1.line, reader2.line]))


proc1 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
proc2 = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)

reader1 = LineReader(proc1.stdout)
reader2 = LineReader(proc2.stdout)

try:
    while True:
        for reader in [reader1, reader2]:
            reader.read_line()
        render()
finally:
    cleanup()

In this version, reader.read_line() will block until data is available. So it will first wait for data from proc1, then wait for data from proc2, then render, repeat. This is not really async yet.

Implementation 2: Busy Loop

import fcntl

def set_nonblock(fd):
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

set_nonblock(proc1.stdout.fileno())
set_nonblock(proc2.stdout.fileno())

try:
    while True:
        for reader in [reader1, reader2]:
            try:
                reader.read_line()
            except BlockingIOError:
                pass
        render()
finally:
    cleanup()

These are just the parts of the code that changed. I used fnctl to set the file descriptor to non-blocking mode. In this mode, os.read() will raise a BlockingIOError if there is nothing to read. This is great because we cannot get stuck on a blocking read. However, this loop will just keep trying and fully saturate the CPU. This is called a busy loop and obviously not what we want.

Implementation 3: Sleepy Loop

import time

try:
    while True:
        for reader in [reader1, reader2]:
            try:
                reader.read_line()
            except BlockingIOError:
                pass
        time.sleep(1)
        render()
finally:
    cleanup()

By simply adding a sleep() we get the benefits of both of the first two implementation: We cannot get stuck on a blocking read, but we also do not end up in a busy loop. This is still far from perfect though: If data arrives quickly we introduce a very noticeable delay of 1 second. And if data arrives slowly we wake up much more often than would be needed. We can adjust the sleep duration to the specific case, but it will never be perfect.

Implementation 4: Select Loop

import selectors

selector = selectors.DefaultSelector()

selector.register(proc1.stdout, selectors.EVENT_READ, reader1)
selector.register(proc2.stdout, selectors.EVENT_READ, reader2)

try:
    while True:
        for key, mask in selector.select(10):
            key.data.read_line()
        render()
finally:
    cleanup()

What we actually want to do is sleep until one of the file descriptors is ready. That is exactly what selectors are for. There are different system calls that can be used to implement a selector. It got its name from select, but nowadays your are more likely to use epoll. The selectors module will automatically pick the best option.

In the code above I also added a timeout of 10 seconds to the select call. This allows us to update the time even if none of the file descriptors become available for some time.

So with this implementation we have our first real async loop, and from here on out we will stick with selectors and only restructure the code surrounding them.

Implementation 5: Callback Loop

class Loop:
    def __init__(self):
        self.selector = selectors.DefaultSelector()
        self.times = []

    def set_timeout(self, callback, timeout):
        now = time.time()
        self.times.append((callback, now + timeout))

    def set_interval(self, callback, timeout):
        def wrapper():
            callback()
            self.set_timeout(wrapper, timeout)
        self.set_timeout(wrapper, 0)

    def register_file(self, file, callback):
        self.selector.register(file, selectors.EVENT_READ, callback)

    def unregister_file(self, file):
        self.selector.unregister(file)

    def run(self):
        while True:
            now = time.time()
            timeout = min((t - now for _, t in self.times), default=None)

            for key, mask in self.selector.select(timeout):
                key.data()

            keep = []
            now = time.time()
            for callback, t in self.times:
                if t < now:
                    callback()
                else:
                    keep.append((callback, t))
            self.times = keep


def callback1():
    try:
        reader1.read_line()
    except ValueError:
        loop.unregister_file(proc1.stdout)
    render()


def callback2():
    try:
        reader2.read_line()
    except ValueError:
        loop.unregister_file(proc2.stdout)
    render()


loop = Loop()
loop.register_file(proc1.stdout, callback1)
loop.register_file(proc2.stdout, callback2)
loop.set_interval(render, 10)

try:
    loop.run()
finally:
    cleanup()

This implementation improves on the previous one by being much more modular. You can register files with callbacks that will be executed whenever the file is ready. There is also a much more sophisticated system for timeouts and intervals, similar to what you might know from JavaScript.

Aside: Everything is a File

So far our loops can react to files and timeouts, but is that enough? My first hunch is that in unix, "everything is a file", so this should get us pretty far. But let's take a closer look.

Implementation 6: Generator Loop

We are getting closer to asyncio, but there is still a lot of conceptual ground to cover. Before we get to async/await, we have to talk about generators.

Motivation

As I said in the introduction, I personally really like the callback approach. It is simple, just a selector and some callback functions. Compared to that I find asyncio with its coroutines and tasks and futures and awaitables and transports and protocols and async iterators and executors just confusing.

But I recently read Nathaniel J. Smith's posts on trio, an alternative async loop for python, and I must admit that there are some solid arguments for async/await there. It boils down to this:

Splitting asynchronous execution into a setup and a callback phase does more harm then good.

Let's look at an example:

In all the code samples so far I created subprocesses and made sure that they are terminated when the process exits:

proc = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)

try:
    do_something()
finally:
    proc.terminate()
    proc.wait()

Now let's say this is not the whole program, but just one function. And let's further say that do_something() is asynchronous:

def foo():
    proc = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)

    def callback():
        proc.terminate()
        proc.wait()

    loop.set_timeout(callback, 10)

We have a couple of issues here:

Functions are a curious concept that allows us to get stack traces, easily share state, and do cleanup. The callback approach tries to get by without these benefits. The async/await approach instead tries to keep them by allowing to pause the execution of functions.

The yield expression

The yield expression has been part of python since PEP 255 (2001) and got extended considerably in PEP 342 (2005). It allows to pause execution of a function and hand control back to the caller.

In its simplest form it can be used in a for loop:

def foo():
    print('yielding 1')
    yield 1
    print('yielding 2')
    yield 2

for x in foo():
    print(x)

## yielding 1
## 1
## yielding 2
## 2

Another common use is to define context managers:

from contextlib import contextmanager

@contextmanager
def bar():
    proc = subprocess.Popen(['./random.sh'], stdout=subprocess.PIPE)
    try:
        yield proc
    finally:
        proc.terminate()
        proc.wait()

with bar() as proc:
    do_something()

A function that uses yield is called a generator function. Instead of a normal value it returns a generator. The first example can be rewriting roughly like this:

class FooGenerator:
    def __init__(self):
        self.state = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.state == 0:
            self.state += 1
            return 1
        elif self.state == 1:
            self.state += 1
            return 2
        else:
            raise StopIteration()

gen = iter(FooGenerator())
while True:
    try:
        x = next(gen)
        print(x)
    except StopIteration:
        break

It is important to distinguish these two conceptual layers. For example, raising StopIteration only makes sense in a generator, not in a generator function.

There are a few more things you can do with generators:

For a more in-depth discussion of generators I can recommend the introduction to async/await by Brett Cannon.

The Loop

import datetime
import os
import selectors
import subprocess
import time

selector = selectors.DefaultSelector()
data = ['', '', '']


class LineReader:
    def __init__(self, file):
        self.file = file
        self.buffer = b''
        self.line = ''

    def read_line(self):
        chunk = os.read(self.file.fileno(), 1024)
        if not chunk:
            raise ValueError
        self.buffer += chunk
        lines = self.buffer.split(b'\n')
        if len(lines) > 1:
            self.line = lines[-2].decode('utf-8')
        self.buffer = lines[-1]


class Task:
    def __init__(self, gen):
        self.gen = gen
        self.files = set()
        self.times = set()
        self.done = False
        self.result = None

    def set_result(self, result):
        self.done = True
        self.result = result

    def init(self):
        try:
            self.files, self.times = next(self.gen)
        except StopIteration as e:
            self.set_result(e.value)

    def wakeup(self, files, now):
        try:
            if self.done:
                return
            elif any(t < now for t in self.times) or files & self.files:
                self.files, self.times = self.gen.send((files, now))
        except StopIteration as e:
            self.set_result(e.value)

    def close(self):
        self.gen.close()


def run(gen):
    task = Task(gen)
    try:
        task.init()
        while not task.done:
            now = time.time()
            timeout = min((t - now for t in task.times), default=None)
            files = {key.fileobj for key, mask in selector.select(timeout)}
            task.wakeup(files, time.time())
        return task.result
    finally:
        task.close()


def sleep(t):
    yield set(), {time.time() + t}


def gather(*generators):
    subtasks = [Task(gen) for gen in generators]
    try:
        for task in subtasks:
            task.init()
        while True:
            wait_files = set().union(
                *[t.files for t in subtasks if not t.done]
            )
            wait_times = set().union(
                *[t.times for t in subtasks if not t.done]
            )
            files, now = yield wait_files, wait_times
            for task in subtasks:
                task.wakeup(files, now)
            if all(task.done for task in subtasks):
                return [task.result for task in subtasks]
    finally:
        for task in subtasks:
            task.close()


def render():
    data[0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(' '.join(data))


def popen(cmd, i):
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    reader = LineReader(proc.stdout)
    selector.register(proc.stdout, selectors.EVENT_READ)
    try:
        while True:
            yield {proc.stdout}, set()
            reader.read_line()
            data[i] = reader.line
            render()
    except ValueError:
        pass
    finally:
        selector.unregister(proc.stdout)
        proc.terminate()
        proc.wait()


def clock():
    while True:
        yield from sleep(10)
        render()


def amain():
    yield from gather(
        popen(['./random.sh'], 1),
        popen(['./random.sh'], 2),
        clock(),
    )


run(amain())

This is the complete code and not just the changed bits because there are so many changes all over the place. I hope with all the buildup it doesn't seem too crazy. Still there are some new concepts that I will try to expand on.

First, note how all the setup and teardown for each individual subprocess is now bundled in popen(). This is exactly what I was talking about before: We can now trivially do cleanup. This is made possible by the little yield expression that pokes a hole in this function somewhere in the middle.

The terminals on the one end of the communication are expressions like yield {file}, set() or yield set(), {timeout}. This means "pause this generator until this condition is met".

On the other end of the communication there is run() which will figure out which files are available and send that up the chain.

In between there is gather() which takes the information from both ends and figures out which of its subtasks should be unpaused. Most other function just pass through the messages by using yield from.

All this is mediated by Task objects which keep track of the conditions and state of generators.

Implementation 7: async/await Loop

From here it is a small step to async/await. Generators that are used for asynchronous execution have already been called "coroutines" in PEP 342. PEP 492 (2015) deprecated that approach in favor of "native coroutines" and async/await.

Native coroutines are not really different from generator-based coroutines. You can still get the underlying generator by calling coro.__await__(). New syntax was introduced to keep the two concepts apart: Iterators use yield, coroutines use async/await. These two code snippets are more or less identical:

async def foo():
    await sleep(10)
class FooCoroutine:
    def __await__(self):
        return (yield from sleep(10).__await__())

This is a minor change, but still it changes the syntax all over the place. So here is the full async/await implementation:

import datetime
import os
import selectors
import subprocess
import time

selector = selectors.DefaultSelector()
data = ['', '', '']


class LineReader:
    def __init__(self, file):
        self.file = file
        self.buffer = b''
        self.line = ''

    def read_line(self):
        chunk = os.read(self.file.fileno(), 1024)
        if not chunk:
            raise ValueError
        self.buffer += chunk
        lines = self.buffer.split(b'\n')
        if len(lines) > 1:
            self.line = lines[-2].decode('utf-8')
        self.buffer = lines[-1]
        return self.line


class AYield:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        return (yield self.value)


class Task:
    def __init__(self, coro):
        self.gen = coro.__await__()
        self.files = set()
        self.times = set()
        self.done = False
        self.result = None

    def set_result(self, result):
        self.done = True
        self.result = result

    def init(self):
        try:
            self.files, self.times = next(self.gen)
        except StopIteration as e:
            self.set_result(e.value)

    def wakeup(self, files, now):
        try:
            if self.done:
                return
            elif any(t < now for t in self.times) or files & self.files:
                self.files, self.times = self.gen.send((files, now))
        except StopIteration as e:
            self.set_result(e.value)

    def close(self):
        self.gen.close()


def run(coro):
    task = Task(coro)
    try:
        task.init()
        while not task.done:
            now = time.time()
            timeout = min((t - now for t in task.times), default=None)
            files = {key.fileobj for key, mask in selector.select(timeout)}
            task.wakeup(files, time.time())
        return task.result
    finally:
        task.close()


async def sleep(t):
    await AYield((set(), {time.time() + t}))


async def gather(*coros):
    subtasks = [Task(coro) for coro in coros]
    try:
        for task in subtasks:
            task.init()
        while True:
            wait_files = set().union(
                *[t.files for t in subtasks if not t.done]
            )
            wait_times = set().union(
                *[t.times for t in subtasks if not t.done]
            )
            files, now = await AYield((wait_files, wait_times))
            for task in subtasks:
                task.wakeup(files, now)
            if all(task.done for task in subtasks):
                return [task.result for task in subtasks]
    finally:
        for task in subtasks:
            task.close()


def render():
    data[0] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(' '.join(data))


async def popen(cmd, i):
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    reader = LineReader(proc.stdout)
    selector.register(proc.stdout, selectors.EVENT_READ)
    try:
        while True:
            await AYield(({proc.stdout}, set()))
            reader.read_line()
            data[i] = reader.line
            render()
    except ValueError:
        pass
    finally:
        selector.unregister(proc.stdout)
        proc.terminate()
        proc.wait()


async def clock():
    while True:
        await sleep(10)
        render()


async def amain():
    await gather(
        popen(['./random.sh'], 1),
        popen(['./random.sh'], 2),
        clock(),
    )


run(amain())

Implementation 8: asyncio

So which kinds of loop does asyncio use? After reading PEP 3156 I would say: That's complicated.

At the core, asyncio is a simple callback loop. The relevant functions are called add_reader(file, callback) and call_later(delay, callback).

But then asyncio adds a second layer using async/await. A simplified version looks roughly like this:

import asyncio


class Future:
    def __init__(self):
        self.callbacks = []
        self.result = None
        self.execution = None
        self.done = False

    def _set_done(self):
        self.done = True
        for callback in self.callbacks:
            callback(self)

    def set_result(self, result):
        self.result = result
        self._set_done()

    def set_exception(self, exception):
        self.exception = exception
        self._set_done()

    def add_done_callback(self, callback):
        self.callbacks.append(callback)

    def __await__(self):
        yield self


class Task:
    def __init__(self, coro):
        self.gen = coro.__await__()

    def wakeup(self, future=None):
        try:
            if future and future.exception:
                new_future = self.gen.throw(future.exception)
            else:
                new_future = next(self.gen)
            new_future.add_done_callback(self.wakeup)
        except StopIteration:
            pass


async def sleep(t):
    future = Future()
    loop.call_later(t, future.set_result, None)
    await future


async def amain():
    print('start')
    try:
        await sleep(5)
        loop.stop()
    finally:
        print('finish')


loop = asyncio.new_event_loop()
task = Task(amain())
task.wakeup()
loop.run_forever()

When we call task.wakeup(), the coroutine amain() starts executing. It prints 'foo', creates a future, and tells the loop to resolve that future in 5 seconds. Then it yields that future back down to wakeup(), which registeres itself as a callback on the future. Now the loop starts running, waits for 5 seconds, and then resolves the future. Because wakeup() was added as a callback, it is now called again and passes control back into amain(), which prints 'finish', stops the loop, and raises StopIteration.

In the earlier coroutine examples, I yielded files and timeouts as conditions. Since this version is hosted on a callback loop, it instead yields futures that wrap loop callbacks.

This approach works reasonably well. But I also see some issues with it.

Limited support for files

You may have noticed that I did not implement the full subprocess example this time. This is because asyncio's coroutine layer doesn't really support files.

Futures represent actions that are completed when the callback is called. File callbacks are called every time data is available for reading. This disconnect can probably be bridged somehow, but this post is already long enough and I didn't want to go down yet another rabbit hole.

Futures are not a monad

If you know some JavaScript you have probably come across Promises. Promises are basically the JavaScript equivalent of Futures. However, they have a much nicer API. They are basically a monad, and every Haskell fan can give you an impromptu lecture about the awesomeness of monads. Consider the following snippets that do virtually the same:

Promise.resolve(1)
    .then(x => x + 1)
    .finally(() => console.log('done'));
import asyncio

def increment(future):
    try:
        future2.set_result(future.result() + 1)
    except Exception as e:
        future2.set_exception(e)

def print_done(future):
    print('done')

loop = asyncio.new_event_loop()

future1 = loop.create_future()
future1.add_done_callback(increment)
future1.set_result(1)

future2 = loop.create_future()
future2.add_done_callback(print_done)

loop.run_until_complete(future2)

Naming Confusion

So far we have "Coroutines", "Futures", and "Tasks". The asyncio documentation also uses the term "Awaitables" for anything that implements __await__(), so both Coroutines and Futures are Awaitables.

What really makes this complicated is that Task inherits from Future. So in some places, Coroutines and Futures can be used interchangably because they are both Awaitables -- and in other places, Coroutines and Futures can be used interchangably because Coroutines can automatically be wrapped in Tasks which makes them Futures.

I wonder whether it would have been better to call Tasks "CoroutineFutures" instead. Probably not. That makes them sound like they are a simple wrapper, when in fact they are the thing that is actually driving most of the coroutine layer.

In any case I believe the asyncio documentation could benefit from a clear separation of layers. First should be a description of the high level coroutine API including sleep() and gather(). The second part could be about the callback layer, including call_later() and add_reader(). The third and final part could explain the low level plumbing for those people who want to dive deep. This is the only part that needs to mention terms like "Awaitable", "Task", or "Future".

Conclusion

These were eight different versions of asynchronous loops. I have certainly learned something. A bit about async primitives on linux and a lot about generators and coroutines in python. I hope this post serves as a helpful reference for future endeavors.

The big question remains: Which approach is better? The simple cleanup in the coroutine approach is a huge advantage, but it comes at the cost of significant complexity compared to callbacks. The thought that we have to limit ourselves to one of them is not great. So here's to hoping we will someday find an approach that combines the benefits of both.