Understanding Asynchronous IO With Python 3.4's Asyncio And Node.js

Introduction

I spent this summer working on a web platform running on Node.js. This was the first time I worked full-time with Node.js and one thing that became quite apparent after a few weeks of working with it was that many developers, including myself at the time, lack clarify on exactly how the asynchronous features of Node.js work, and how they are implemented at a lower level. Since I believe the only way to use a platform efficiently is to have a clear understanding of how it works, I decided to dig deeper. This curiosity also made me start playing around with implementing similar asynchronous features in other languages, in particular Python, it being my go-to language for experimenting and learning. This led me to Python 3.4's asynchronous IO library asyncio in particular, which intersected with my already existing interest in coroutines (see my post on combinatorial generation using coroutines in Python.) This post is about exploring the questions and answers that came up while I was learning more about this subject, which I hope can help clarify and answer some questions for others as well.

All the Python code is intended for Python 3.4. This is mostly because Python 3.4 introduces the selectors module as well as asyncio. For earlier versions of Python, libraries such as Twisted, gevent, and tornado, provide similar functionality.

In the early examples below, I chose to almost entirely ignore the issue of error handling and exceptions. This was done mostly for the sake of simplicity, and it should be noted that proper handling of exceptions should be a very important aspect of the type of code we see below. I will provide a few examples of how Python 3.4's asyncio module handles exceptions at the end.

Getting Started: Hello World Revisited

Let's start by writing a program to solve a very simple problem. We will use this problem and minor variations of it for the rest of the section to demonstrate the ideas.

Write a program to print "Hello world!" every three seconds, and at the same time wait for input from the user. Each line of user input will contain a single positive number \(n\). As soon as input is entered, calculate and output the Fibonacci number \(F(n)\) and continue to wait for more input.

Note that there's a chance the periodic "Hello world!" is inserted in the middle of user input, but we do not care about that.

Those of you familiar with Node.js and JavaScript might already have a solution in mind that will likely look something like this:

log_execution_time = require('./utils').log_execution_time;

var fib = function fib(n) {
    if (n < 2) return n;
    return fib(n - 1) + fib(n - 2);
};

var timed_fib = log_execution_time(fib);

var sayHello = function sayHello() {
    console.log(Math.floor((new Date()).getTime() / 1000) + " - Hello world!");
};

var handleInput = function handleInput(data) {
    n = parseInt(data.toString());
    console.log('fib(' + n + ') = ' + timed_fib(n));
};

process.stdin.on('data', handleInput);
setInterval(sayHello, 3000);

As you can see, this is quite easy to do in Node.js. All we have to do is set an interval timer to print "Hello world!" and attach an event handler to the data event of process.stdin and we are done. Simple to understand on an abstract level, and very easy to use. It just works! But how? To answer this let's try to do the exact same thing in Python.

Also notice that we use a log_execution_time decorator to output the time it takes to calculate the Fibonacci number. Here's the definition of this decorator in Python:

from functools import wraps
from time import time


def log_execution_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time()
        return_value = func(*args, **kwargs)
        message = "Executing {} took {:.03} seconds.".format(func.__name__,
                                                             time() - start)
        print(message)
        return return_value
    return wrapper

And similarly, in JavaScript:

// We do not care about handling the "this" parameter correctly in our examples.
// Do not use this decorator where that's needed!
module.exports.log_execution_time = function log_execution_time(func) {
    var wrapper = function() {
        start = (new Date()).getTime();
        return_value = func.apply(this, arguments);
        message = "Calculation took " + ((new Date()).getTime() - start) / 1000 + " seconds";
        console.log(message);
        return return_value;
    };
    return wrapper;
};

The algorithm to calculate the Fibonacci numbers used here is intentionally chosen to be the slowest one of all (exponential running time). This is because this post is not about Fibonacci numbers (see this post on that subject, as there is a logarithmic-time algorithm) and that I actually want the code to be slow to demonstrate some of the concepts below. Here's the Python code for it, that will be used multiple times below.

from log_execution_time import log_execution_time


def fib(n):
    return fib(n - 1) + fib(n - 2) if n > 1 else n


timed_fib = log_execution_time(fib)

So, back to the task at hand. How do we even begin? Python does not provide a built-in setInterval or setTimeout. So a first possible solution is to use OS-level concurrency for this. Let's look at using two threads to do what we need. We will look at threads in some more detail in a bit.

from threading import Thread
from time import sleep
from time import time
from fib import timed_fib


def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        sleep(3)


def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))


def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    # Main thread will read and process input
    read_and_process_input()

if __name__ == '__main__':
    main()

Quite simple as well. But are the thread-based Python solution and the Node.js solution equivalent? Let's do an experiment. As we discussed, our Fibonacci number calculation code is very slow, so let's try a rather large number, say 37 for Python and 45 for Node.js (JavaScript is quite a bit faster than Python at numerical calculations).

$ python3.4 hello_threads.py
1412360472 - Hello world!
37
1412360475 - Hello world!
1412360478 - Hello world!
1412360481 - Hello world!
Executing fib took 8.96 seconds.
fib(37) = 24157817
1412360484 - Hello world!

As you notice, it took about 9 seconds for the calculation to be finished but the "Hello world!" message is printed while that calculation takes place. Let's try it with Node.js:

$ node hello.js
1412360534 - Hello world!
1412360537 - Hello world!
45
Calculation took 12.793 seconds
fib(45) = 1134903170
1412360551 - Hello world!
1412360554 - Hello world!
1412360557 - Hello world!

With Node.js on the other hand, the printing of the "Hello world!" message is paused while the Fibonacci number is calculated. Let's see how this makes sense.

Event Loops and Threads

To understand the difference in behaviour of the two solutions in the previous section, we need to have a simple understanding of threads and event loops. Let's start with threads. Think of a thread as a single sequence of instructions and the CPU's current state in executing them (CPU state refers to e.g. register values, in particular the next instruction register).

A simple synchronous program often runs on a single thread, which is why if an operation needs to wait for something, say an IO operation or a timer, the execution of the program is paused until the operation is finished. One of the simplest blocking operations is sleep. In fact, that's all sleep does, namely blocking the thread it is executed on for the given length of time. A process can have multiple threads running in it. Threads in the same process share the same process-level resources, such as memory and its address space, file descriptors, etc.

The operating system is in charge of handling threads, and the scheduler in the OS takes care of jumping between threads in a process (and between processes, but we are not too concerned with that part, since it is outside the scope of this post.) The operating system's scheduler will choose when to put a thread on pause and give control of the CPU to another thread for execution. This is called a context switch, and involves saving of the context of the current thread (e.g. CPU register values) and then loading the state of the target thread. Context switching can be somewhat expensive in that it itself requires CPU cycles.

There are many reasons the OS might choose to switch to another thread. Examples can be that another higher priority process or thread requires immediate attention (for example, code that handles hardware interrupts), that the thread itself asks to be paused for a while (e.g. in sleep), or because the thread has used the dedicated time it was assigned (this is also called the thread quantum) and will have to go back into a queue to be scheduled to continue execution.

Going back to our solutions above, the Python solution is clearly multi-threaded. This explains why the two tasks are run concurrently, and why the calculation of the large Fibonacci number, which is CPU intensive, is not blocking the execution of the other thread.

But what about Node.js? It appears, based on the fact that the calculation is blocking the other task, that the our code is running on a single thread. And this is in fact how Node.js is implemented. As far as the operating system is concerned your application is running in a single thread (I am simplifying things a little bit here, since depending on the platform libuv might use thread pools for some of the IO events, but even that doesn't change the fact that your JavaScript code is still running on a single thread.)

There are a few reasons you might want to avoid threads in certain situations. One is that threads can be computationally and resource-wise expensive, and the other that the true concurrent behaviour of threads, along with shared memory means concurrency issues such as deadlocks and race conditions enter the picture, leading to more complex code and the need to keep thread safety in mind while programming. (Of course, these are relative, and there's a time and place for threads. But that's besides the point of this article!)

Let's see if we can solve the above problem without using multi-threading. To do so, we will imitate what Node.js uses behind the scenes: an event loop. First, we will need a way to poll stdin for input availability, that is, a system call that asks if a file descriptor (in this case stdin) has input available for reading or not. Depending on the operating system, there are a variety of system calls for this, such as poll, select, kqueue, etc. In Python 3.4, the selectors module provides an abstraction over these system calls so you can use them (somewhat) safely on a variety of machines.

Once we have the polling functionality, our event loop will be very simple: in each iteration of the loop, we check to see if there's input available for reading, and if so we read and process it. After that, we check to see if more than three seconds has passed since the last printing of "Hello world!" and if yes, we print it. Let's give this a shot.

import selectors
import sys
from time import time
from fib import timed_fib


def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


def print_hello():
    print("{} - Hello world!".format(int(time())))


def main():
    selector = selectors.DefaultSelector()
    # Register the selector to poll for "read" readiness on stdin
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0  # Setting to 0 means the timer will start right away
    while True:
        # Wait at most 100 milliseconds for input to be available
        for event, mask in selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()


if __name__ == '__main__':
    main()

And the output:

$ python3.4 hello_eventloop.py
1412376429 - Hello world!
1412376432 - Hello world!
1412376435 - Hello world!
37
Executing fib took 9.7 seconds.
fib(37) = 24157817
1412376447 - Hello world!
1412376450 - Hello world!

And as expected, because we are using a single thread, the program acts the same way as Node.js does, that is, the calculation blocks the running of the "Hello world!" task. Great, this is neat! But our solution is rather hard-coded for the specific problem. In next sections, we will look at generalizing our event loop code to be a bit more powerful and easier to program for, first using callbacks and then using coroutines.

Event Loops With Callbacks

A natural generalization of the previous section's event loop is to allow for generic event handlers. This can be relatively easily achieved using callbacks: for each event type (in our case, we only have two of them, input on stdin and timers going off), allow the user to add arbitrary functions as event handlers. The code is simple enough that we might as well just jump to it directly. There is only one bit that's a bit tricky, and it's the use of bisect.insort to handle timer events. The algorithm here is to keep the list of timer events sorted, with the timers to run earliest first. This way, at each iteration of the event loop, we just have to check to see if there are any timers, and if there are, start at the beginning and run all timers that have expired. bisect.insort makes this easier by inserting the item in correct index in the list. There are various other approaches to this but this is the one I opted for.

from bisect import insort
from collections import namedtuple
from fib import timed_fib
from time import time
import selectors
import sys


Timer = namedtuple('Timer', ['timestamp', 'handler'])


class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)

            # Handle timer events
            while self._timers and self._timers[0].timestamp < time():
                handler = self._timers[0].handler
                del self._timers[0]
                handler()

    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)

    def add_timer(self, wait_time, callback):
        timer = Timer(timestamp=time() + wait_time, handler=callback)
        insort(self._timers, timer)

    def stop(self):
        self._running = False


def main():
    loop = EventLoop()

    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)

    def f(x):
        def g():
            print(x)
        return g

    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()


if __name__ == '__main__':
    main()

This looks quite simple, and in practise, this is the method most commonly used in Node.js code as well. However, in more complicated applications, this style of writing asynchronous code, especially once error handling is added, quite quickly becomes what is known as callback hell. To quote Guido van Rossum on callbacks:

It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code. - Guido van Rossum

There are multiple other alternative approaches to this, such as promises and coroutines (and about a million NPM libraries for each alternative). The one I prefer the most (it's no secret that I think coroutines are very cool!) is using coroutines. Next section goes over implementing a similar event loop that uses coroutines as tasks.

Event Loops With Coroutines

A coroutine is a function that can "return" while still remembering the state in which it is returning (value of local variables, and what the next instruction should be). This will then allow the coroutine to be called again, which results in it continuing from where it left off. This form of "returning" is often called yielding. I go into much more detail on coroutines and their implementation in Python in my combinatorial generation using coroutines article. Below I provide a much quicker introduction to them before we use them in the example.

In Python, the yield keyword can be used to create coroutines. When used as a simple statement, such as yield value, the given value is yielded, and control is given back to the caller. To continue the coroutine starting from the instruction after the yield statement, the caller needs to use the built-in next function. When used as an expression, such as y = yield x, the value x is yielded, and to continue the coroutine, the coroutine's send method can be used, in which case the value given to send will be sent back to the coroutine as the value returned by the expression (and hence assigned to y in this example).

This means that we can write our asynchronous code as coroutines, and simply yield when we need to wait on an asynchronous operation. To do this, we simply yield the task or other coroutine whose value we will need to continue. The code will then look very sequential and similar to synchronous code. Here's a simple example of what the Fibonacci portion of our solution will look like:

def read_input():
    while True:
        line = yield sys.stdin
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

Of course for this to work, we will need an event loop that can handle coroutines. To achieve this, we will maintain a queue of tasks to be run by the event loop. When input is available, or a timer goes off (or more generally, any other event that we care about), we have a list of coroutines that need to continue (possibly with a value to be sent to them). With each task, we have a bound stack variable that keeps track of the stack of coroutines to run in the chain, each depending on the next to finish. This is based on the example of a "Trampoline" provided in PEP 342. I also use functools.partial as the Python equivalent of Function.prototype.bind in JavaScript, namely to Curry a function by binding parameter values to it.

This is what it would look like:

from bisect import insort
from collections import deque
from collections import namedtuple
from fib import timed_fib
from functools import partial
from time import time
import selectors
import sys
import types


Timer = namedtuple('Timer', ['timestamp', 'handler'])


class sleep_for_seconds(object):
    """
    Yield an object of this type from a coroutine to have it "sleep" for the
    given number of seconds.
    """
    def __init__(self, wait_time):
        self._wait_time = wait_time


class EventLoop(object):
    """
    Implements a simplified coroutine-based event loop as a demonstration.
    Very similar to the "Trampoline" example in PEP 342, with exception
    handling taken out for simplicity, and selectors added to handle file IO
    """
    def __init__(self, *tasks):
        self._running = False
        self._selector = selectors.DefaultSelector()

        # Queue of functions scheduled to run
        self._tasks = deque(tasks)

        # (coroutine, stack) pair of tasks waiting for input from stdin
        self._tasks_waiting_on_stdin = []

        # List of (time_to_run, task) pairs, in sorted order
        self._timers = []

        # Register for polling stdin for input to read
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def resume_task(self, coroutine, value=None, stack=()):
        result = coroutine.send(value)
        if isinstance(result, types.GeneratorType):
            self.schedule(result, None, (coroutine, stack))
        elif isinstance(result, sleep_for_seconds):
            self.schedule(coroutine, None, stack, time() + result._wait_time)
        elif result is sys.stdin:
            self._tasks_waiting_on_stdin.append((coroutine, stack))
        elif stack:
            self.schedule(stack[0], result, stack[1])

    def schedule(self, coroutine, value=None, stack=(), when=None):
        """
        Schedule a coroutine task to be run, with value to be sent to it, and
        stack containing the coroutines that are waiting for the value yielded
        by this coroutine.
        """
        # Bind the parameters to a function to be scheduled as a function with
        # no parameters.
        task = partial(self.resume_task, coroutine, value, stack)
        if when:
            insort(self._timers, Timer(timestamp=when, handler=task))
        else:
            self._tasks.append(task)

    def stop(self):
        self._running = False

    def do_on_next_tick(self, func, *args, **kwargs):
        self._tasks.appendleft(partial(func, *args, **kwargs))

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for task, stack in self._tasks_waiting_on_stdin:
                    self.schedule(task, line, stack)
                self._tasks_waiting_on_stdin.clear()

            # Next, run the next task
            if self._tasks:
                task = self._tasks.popleft()
                task()

            # Finally run time scheduled tasks
            while self._timers and self._timers[0].timestamp < time():
                task = self._timers[0].handler
                del self._timers[0]
                task()

        self._running = False


def print_every(message, interval):
    """
    Coroutine task to repeatedly print the message at the given interval
    (in seconds)
    """
    while True:
        print("{} - {}".format(int(time()), message))
        yield sleep_for_seconds(interval)


def read_input(loop):
    """
    Coroutine task to repeatedly read new lines of input from stdin, treat
    the input as a number n, and calculate and display fib(n).
    """
    while True:
        line = yield sys.stdin
        if line == 'exit':
            loop.do_on_next_tick(loop.stop)
            continue
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

Note that this implementation also lets us add a simple do_on_next_tick function which more or less does what process.nextTick does in Node.js. I use it to implement a simple type exit to quit feature. (Although, I didn't really have to use do_on_next_tick, I could have just called loop.stop() directly!)

Another interesting thing to point out here is that we can re-implement our recursive Fibonacci algorithm using coroutines instead of recursive calls, and in doing so we can have it run in "parallel" to other coroutines, including the one that prints hello. This is what it would look like:

from event_loop_coroutine import EventLoop
from event_loop_coroutine import print_every
import sys


def fib(n):
    if n <= 1:
        yield n
    else:
        a = yield fib(n - 1)
        b = yield fib(n - 2)
        yield a + b


def read_input(loop):
    while True:
        line = yield sys.stdin
        n = int(line)
        fib_n = yield fib(n)
        print("fib({}) = {}".format(n, fib_n))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

The output for this program will be:

$ python3.4 fib_coroutine.py
1412727829 - Hello world!
1412727832 - Hello world!
28
1412727835 - Hello world!
1412727838 - Hello world!
fib(28) = 317811
1412727841 - Hello world!
1412727844 - Hello world!

Not Reinventing The Wheel

In the previous two sections, we went over the general ideas that go into implementing an event loop to allow us to write asynchronous code using either callbacks or coroutines. This was great for the purposes of experimenting and learning about the ideas but in practise, there are already quite mature libraries for Python that provide event loops. In addition, Python 3.4 ships with the asyncio module that has event loops and coroutines for IO operations, networking, and more. Let's first solve the above problem using asyncio and then look at a few more interesting examples.

import asyncio
import sys
from time import time
from fib import timed_fib


def process_input():
    text = sys.stdin.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


@asyncio.coroutine
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        yield from asyncio.sleep(3)


def main():
    loop = asyncio.get_event_loop()
    loop.add_reader(sys.stdin, process_input)
    loop.run_until_complete(print_hello())


if __name__ == '__main__':
    main()

Notice how @asyncio.coroutine is used to decorate coroutines, and yield from as opposed to just yield is used to values from other coroutines.

Handling Exceptions

Python's coroutine's allow an exception to be thrown in the stack frame of the coroutine, and have it be caught at the point the coroutine has paused. Let's look at a simple example:

def coroutine():
    print("Starting")
    try:
        yield "Let's pause until continued."
        print("Continuing")
    except Exception as e:
        yield "Got an exception: " + str(e)


def main():
    c = coroutine()
    next(c)  # Execute until the first yield
    # Now throw an exception at the point where the coroutine has paused
    value = c.throw(Exception("Have an exceptional day!"))
    print(value)


if __name__ == '__main__':
    main()

Which outputs:

Starting
Got an exception: Have an exceptional day!

This makes it rather easy to have one unified way of handling errors using exceptions, in both synchronous and asynchronous code, provided the event loop catches and propagates exceptions properly. For example, let's look at an example with chained coroutines and an event loop:

import asyncio


@asyncio.coroutine
def A():
    raise Exception("Something went wrong in A!")


@asyncio.coroutine
def B():
    a = yield from A()
    yield a + 1


@asyncio.coroutine
def C():
    try:
        b = yield from B()
        print(b)
    except Exception as e:
        print("C got exception:", e)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(C())


if __name__ == '__main__':
    main()

Output:

C got exception: Something went wrong in A!

In this example, coroutine C relies on the result of B which in turn relies on the result of A, which decides to throw an exception. As you can see, the exception gets propagated all the way C, which catches it and prints the message. As you can see, this behaves almost exactly the same as synchronous code would. No more catching and passing of errors through callbacks manually!

Of course, this example is rather theoretical and uninspired. Let's look at a real example: let's write some code to asynchronously grab the external IP address of the computer using ipify. Since asyncio does not ship with an HTTP client (yet, anyway!) we have to go to the TCP level and write the HTTP request and parse the response ourselves. Since we are doing this with a very specific API in mind (and as an example, not production code!), let's actually go ahead and do this. In practise, using a library meant for exactly this, for example aiohttp, is a much better idea of course. Let's see what this looks like:

import asyncio
import json


host = 'api.ipify.org'
request_headers = {'User-Agent': 'python/3.4',
                   'Host': host,
                   'Accept': 'application/json',
                   'Accept-Charset': 'UTF-8'}


@asyncio.coroutine
def write_headers(writer):
    for key, value in request_headers.items():
        writer.write((key + ': ' + value + '\r\n').encode())
    writer.write(b'\r\n')
    yield from writer.drain()


@asyncio.coroutine
def read_headers(reader):
    response_headers = {}
    while True:
        line_bytes = yield from reader.readline()
        line = line_bytes.decode().strip()
        if not line:
            break
        key, value = line.split(':', 1)
        response_headers[key.strip()] = value.strip()
    return response_headers


@asyncio.coroutine
def get_my_ip_address(verbose):
    reader, writer = yield from asyncio.open_connection(host, 80)
    writer.write(b'GET /?format=json HTTP/1.1\r\n')
    yield from write_headers(writer)
    status_line = yield from reader.readline()
    status_line = status_line.decode().strip()
    http_version, status_code, status = status_line.split(' ')
    if verbose:
        print('Got status {} {}'.format(status_code, status))
    response_headers = yield from read_headers(reader)
    if verbose:
        print('Response headers:')
        for key, value in response_headers.items():
            print(key + ': ' + value)
    # Assume the content length is sent by the server, which is the case
    # with ipify
    content_length = int(response_headers['Content-Length'])
    response_body_bytes = yield from reader.read(content_length)
    response_body = response_body_bytes.decode()
    response_object = json.loads(response_body)
    writer.close()
    return response_object['ip']


@asyncio.coroutine
def print_my_ip_address(verbose):
    try:
        ip_address = yield from get_my_ip_address(verbose)
        print("My IP address is:")
        print(ip_address)
    except Exception as e:
        print("Error: ", e)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_my_ip_address(verbose=True))
    finally:
        loop.close()


if __name__ == '__main__':
    main()

Again, notice the similarity to synchronous code: no callbacks, no complicated error handling, just easy and very readable code. Let's see how it works, without any errors:

$ python3.4 ipify.py
Got status 200 OK
Response headers:
Content-Length: 21
Server: Cowboy
Connection: keep-alive
Via: 1.1 vegur
Content-Type: application/json
Date: Fri, 10 Oct 2014 03:46:31 GMT
My IP address is:
<my IP address here, hidden for privacy!>

On the other hand, if something goes wrong, for example if I am not connected to the internet, here is the output:

$ python3.4 ipify.py
Error:  [Errno 8] nodename nor servname provided, or not known

This is one of the main advantages to using coroutines for asynchronous code in my opinion: error handling will be perfectly consistent with synchronous code. For example, in the above, it doesn't matter if one of the chained coroutines fails, or if one of the synchronous calls fails, the exception is caught and handled the exact same way.

Relying On The Results of Multiple Independent Coroutines

In the above examples, we wrote asynchronous code that was inherently sequential, meaning each statement in a coroutine relies on the previous statements finishing before continuing. Sometimes, we want to execute a set of independent tasks and use them as completed, without caring about the order they run in. For example, as a web crawler, we might want to send asynchronous requests to all the links on a web page and add the responses to a queue to be processed as we go.

Coroutines allow for writing asynchronous code that flows very sequentially, but for running independent tasks and processing their results either all at once or as they come, callbacks may, at first, seem to be better. However, Python 3.4's asyncio comes with built-in functions for precisely these two scenarios, namely functions asyncio.as_completed and asyncio.gather.

Let's look at a simple example in which we need to load three URLs. We do it in two ways, first by processing the results as they come in using asyncio.as_completed, and in the next one only once they have all finished loading using asyncio.gather. Instead of actually loading URLs, I chose to have a simple coroutine that pauses for random number of seconds. Here's the code:

import asyncio
import random


@asyncio.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield from asyncio.sleep(wait_time)
    print('Done: URL {} took {}s to get!'.format(url, wait_time))
    return url, wait_time


@asyncio.coroutine
def process_as_results_come_in():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    for coroutine in asyncio.as_completed(coroutines):
        url, wait_time = yield from coroutine
        print('Coroutine for {} is done'.format(url))


@asyncio.coroutine
def process_once_everything_ready():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    results = yield from asyncio.gather(*coroutines)
    print(results)


def main():
    loop = asyncio.get_event_loop()
    print("First, process results as they come in:")
    loop.run_until_complete(process_as_results_come_in())
    print("\nNow, process results once they are all ready:")
    loop.run_until_complete(process_once_everything_ready())


if __name__ == '__main__':
    main()

And the output:

$ python3.4 gather.py
First, process results as they come in:
Done: URL URL2 took 2s to get!
Coroutine for URL2 is done
Done: URL URL3 took 3s to get!
Coroutine for URL3 is done
Done: URL URL1 took 4s to get!
Coroutine for URL1 is done

Now, process results once they are all ready:
Done: URL URL1 took 1s to get!
Done: URL URL2 took 3s to get!
Done: URL URL3 took 4s to get!
[('URL1', 1), ('URL2', 3), ('URL3', 4)]

Digging Deeper

There is a lot that I didn't cover here; Futures and libuv, to name a few. There is also Guido's talk on asynchronous IO in Python 3.4. There are likely many other resources that I am forgetting to include so feel free to recommend some in the comments below.

Comments