Introduction
I spent this summer working on a web platform running on Node.js. This was the
first time I worked full-time with Node.js and one thing that became quite
apparent after a few weeks of working with it was that many developers,
including myself at the time, lack clarify on exactly how the asynchronous
features of Node.js work, and how they are implemented at a lower level. Since
I believe the only way to use a platform efficiently is to have a clear
understanding of how it works, I decided to dig deeper. This curiosity also
made me start playing around with implementing similar asynchronous features in
other languages, in particular Python, it being my go-to language for
experimenting and learning. This led me to Python 3.4's asynchronous IO library
asyncio
in particular, which intersected with my already existing
interest in coroutines (see my post on combinatorial generation using
coroutines in Python.)
This post is about exploring the questions and answers that came up while I was
learning more about this subject, which I hope can help clarify and answer some
questions for others as well.
All the Python code is intended for Python 3.4. This is mostly because
Python 3.4 introduces the selectors
module as well as asyncio
. For
earlier versions of Python, libraries such as Twisted, gevent, and tornado,
provide similar functionality.
In the early examples below, I chose to almost entirely ignore the issue of
error handling and exceptions. This was done mostly for the sake of simplicity,
and it should be noted that proper handling of exceptions should be a very
important aspect of the type of code we see below. I will provide a few
examples of how Python 3.4's asyncio
module handles exceptions at the
end.
Getting Started: Hello World Revisited
Let's start by writing a program to solve a very simple problem. We will use this problem and minor variations of it for the rest of the section to demonstrate the ideas.
Write a program to print "Hello world!" every three seconds, and at the same time wait for input from the user. Each line of user input will contain a single positive number . As soon as input is entered, calculate and output the Fibonacci number and continue to wait for more input.
Note that there's a chance the periodic "Hello world!" is inserted in the middle of user input, but we do not care about that.
Those of you familiar with Node.js and JavaScript might already have a solution in mind that will likely look something like this:
log_execution_time = require('./utils').log_execution_time;
var fib = function fib(n) {
if (n < 2) return n;
return fib(n - 1) + fib(n - 2);
};
var timed_fib = log_execution_time(fib);
var sayHello = function sayHello() {
console.log(Math.floor((new Date()).getTime() / 1000) + " - Hello world!");
};
var handleInput = function handleInput(data) {
n = parseInt(data.toString());
console.log('fib(' + n + ') = ' + timed_fib(n));
};
process.stdin.on('data', handleInput);
setInterval(sayHello, 3000);
As you can see, this is quite easy to do in Node.js. All we have to do is set
an interval timer to print "Hello world!" and attach an event handler to the
data
event of process.stdin
and we are done. Simple to
understand on an abstract level, and very easy to use. It just works! But how?
To answer this let's try to do the exact same thing in Python.
Also notice that we use a log_execution_time
decorator to output the
time it takes to calculate the Fibonacci number. Here's the definition of this
decorator in Python:
from functools import wraps
from time import time
def log_execution_time(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time()
return_value = func(*args, **kwargs)
message = "Executing {} took {:.03} seconds.".format(func.__name__,
time() - start)
print(message)
return return_value
return wrapper
And similarly, in JavaScript:
// We do not care about handling the "this" parameter correctly in our examples.
// Do not use this decorator where that's needed!
module.exports.log_execution_time = function log_execution_time(func) {
var wrapper = function() {
start = (new Date()).getTime();
return_value = func.apply(this, arguments);
message = "Calculation took " + ((new Date()).getTime() - start) / 1000 + " seconds";
console.log(message);
return return_value;
};
return wrapper;
};
The algorithm to calculate the Fibonacci numbers used here is intentionally chosen to be the slowest one of all (exponential running time). This is because this post is not about Fibonacci numbers (see this post on that subject, as there is a logarithmic-time algorithm) and that I actually want the code to be slow to demonstrate some of the concepts below. Here's the Python code for it, that will be used multiple times below.
from log_execution_time import log_execution_time
def fib(n):
return fib(n - 1) + fib(n - 2) if n > 1 else n
timed_fib = log_execution_time(fib)
So, back to the task at hand. How do we even begin? Python does not provide a
built-in setInterval
or setTimeout
. So a first possible solution is
to use OS-level concurrency for this. Let's look at using two threads to do
what we need. We will look at threads in some more detail in a bit.
from threading import Thread
from time import sleep
from time import time
from fib import timed_fib
def print_hello():
while True:
print("{} - Hello world!".format(int(time())))
sleep(3)
def read_and_process_input():
while True:
n = int(input())
print('fib({}) = {}'.format(n, timed_fib(n)))
def main():
# Second thread will print the hello message. Starting as a daemon means
# the thread will not prevent the process from exiting.
t = Thread(target=print_hello)
t.daemon = True
t.start()
# Main thread will read and process input
read_and_process_input()
if __name__ == '__main__':
main()
Quite simple as well. But are the thread-based Python solution and the Node.js solution equivalent? Let's do an experiment. As we discussed, our Fibonacci number calculation code is very slow, so let's try a rather large number, say 37 for Python and 45 for Node.js (JavaScript is quite a bit faster than Python at numerical calculations).
$ python3.4 hello_threads.py 1412360472 - Hello world! 37 1412360475 - Hello world! 1412360478 - Hello world! 1412360481 - Hello world! Executing fib took 8.96 seconds. fib(37) = 24157817 1412360484 - Hello world!
As you notice, it took about 9 seconds for the calculation to be finished but the "Hello world!" message is printed while that calculation takes place. Let's try it with Node.js:
$ node hello.js 1412360534 - Hello world! 1412360537 - Hello world! 45 Calculation took 12.793 seconds fib(45) = 1134903170 1412360551 - Hello world! 1412360554 - Hello world! 1412360557 - Hello world!
With Node.js on the other hand, the printing of the "Hello world!" message is paused while the Fibonacci number is calculated. Let's see how this makes sense.
Event Loops and Threads
To understand the difference in behaviour of the two solutions in the previous section, we need to have a simple understanding of threads and event loops. Let's start with threads. Think of a thread as a single sequence of instructions and the CPU's current state in executing them (CPU state refers to e.g. register values, in particular the next instruction register).
A simple synchronous program often runs on a single thread, which is why if an
operation needs to wait for something, say an IO operation or a timer, the
execution of the program is paused until the operation is finished. One of the
simplest blocking operations is sleep
. In fact, that's all sleep
does, namely blocking the thread it is executed on for the given length of
time. A process can have multiple threads running in it. Threads in the same
process share the same process-level resources, such as memory and its address
space, file descriptors, etc.
The operating system is in charge of handling threads, and the scheduler in the OS takes care of jumping between threads in a process (and between processes, but we are not too concerned with that part, since it is outside the scope of this post.) The operating system's scheduler will choose when to put a thread on pause and give control of the CPU to another thread for execution. This is called a context switch, and involves saving of the context of the current thread (e.g. CPU register values) and then loading the state of the target thread. Context switching can be somewhat expensive in that it itself requires CPU cycles.
There are many reasons the OS might choose to switch to another thread.
Examples can be that another higher priority process or thread requires
immediate attention (for example, code that handles hardware interrupts), that
the thread itself asks to be paused for a while (e.g. in sleep
), or
because the thread has used the dedicated time it was assigned (this is also
called the thread quantum) and will have to go back into a queue to be
scheduled to continue execution.
Going back to our solutions above, the Python solution is clearly multi-threaded. This explains why the two tasks are run concurrently, and why the calculation of the large Fibonacci number, which is CPU intensive, is not blocking the execution of the other thread.
But what about Node.js? It appears, based on the fact that the calculation is blocking the other task, that our code is running on a single thread. And this is in fact how Node.js is implemented. As far as the operating system is concerned your application is running in a single thread (I am simplifying things a little bit here, since depending on the platform libuv might use thread pools for some of the IO events, but even that doesn't change the fact that your JavaScript code is still running on a single thread.)
There are a few reasons you might want to avoid threads in certain situations. One is that threads can be computationally and resource-wise expensive, and the other that the true concurrent behaviour of threads, along with shared memory means concurrency issues such as deadlocks and race conditions enter the picture, leading to more complex code and the need to keep thread safety in mind while programming. (Of course, these are relative, and there's a time and place for threads. But that's besides the point of this article!)
Let's see if we can solve the above problem without using multi-threading. To
do so, we will imitate what Node.js uses behind the scenes: an event loop.
First, we will need a way to poll stdin
for input availability, that
is, a system call that asks if a file descriptor (in this case stdin
) has
input available for reading or not. Depending on the operating system, there
are a variety of system calls for this, such as poll
, select
,
kqueue
, etc. In Python 3.4, the selectors
module provides an
abstraction over these system calls so you can use them (somewhat) safely on a
variety of machines.
Once we have the polling functionality, our event loop will be very simple: in each iteration of the loop, we check to see if there's input available for reading, and if so we read and process it. After that, we check to see if more than three seconds has passed since the last printing of "Hello world!" and if yes, we print it. Let's give this a shot.
import selectors
import sys
from time import time
from fib import timed_fib
def process_input(stream):
text = stream.readline()
n = int(text.strip())
print('fib({}) = {}'.format(n, timed_fib(n)))
def print_hello():
print("{} - Hello world!".format(int(time())))
def main():
selector = selectors.DefaultSelector()
# Register the selector to poll for "read" readiness on stdin
selector.register(sys.stdin, selectors.EVENT_READ)
last_hello = 0 # Setting to 0 means the timer will start right away
while True:
# Wait at most 100 milliseconds for input to be available
for event, mask in selector.select(0.1):
process_input(event.fileobj)
if time() - last_hello > 3:
last_hello = time()
print_hello()
if __name__ == '__main__':
main()
And the output:
$ python3.4 hello_eventloop.py 1412376429 - Hello world! 1412376432 - Hello world! 1412376435 - Hello world! 37 Executing fib took 9.7 seconds. fib(37) = 24157817 1412376447 - Hello world! 1412376450 - Hello world!
And as expected, because we are using a single thread, the program acts the same way as Node.js does, that is, the calculation blocks the running of the "Hello world!" task. Great, this is neat! But our solution is rather hard-coded for the specific problem. In next sections, we will look at generalizing our event loop code to be a bit more powerful and easier to program for, first using callbacks and then using coroutines.
Event Loops With Callbacks
A natural generalization of the previous section's event loop is to allow for
generic event handlers. This can be relatively easily achieved using callbacks:
for each event type (in our case, we only have two of them, input on
stdin
and timers going off), allow the user to add arbitrary functions
as event handlers. The code is simple enough that we might as well just jump to
it directly. There is only one bit that's a bit tricky, and it's the use of
bisect.insort
to handle timer events. The algorithm here is to keep the
list of timer events sorted, with the timers to run earliest first. This way,
at each iteration of the event loop, we just have to check to see if there are
any timers, and if there are, start at the beginning and run all timers that
have expired. bisect.insort
makes this easier by inserting the item in
correct index in the list. There are various other approaches to this but this
is the one I opted for.
from bisect import insort
from collections import namedtuple
from fib import timed_fib
from time import time
import selectors
import sys
Timer = namedtuple('Timer', ['timestamp', 'handler'])
class EventLoop(object):
"""
Implements a callback based single-threaded event loop as a simple
demonstration.
"""
def __init__(self, *tasks):
self._running = False
self._stdin_handlers = []
self._timers = []
self._selector = selectors.DefaultSelector()
self._selector.register(sys.stdin, selectors.EVENT_READ)
def run_forever(self):
self._running = True
while self._running:
# First check for available IO input
for key, mask in self._selector.select(0):
line = key.fileobj.readline().strip()
for callback in self._stdin_handlers:
callback(line)
# Handle timer events
while self._timers and self._timers[0].timestamp < time():
handler = self._timers[0].handler
del self._timers[0]
handler()
def add_stdin_handler(self, callback):
self._stdin_handlers.append(callback)
def add_timer(self, wait_time, callback):
timer = Timer(timestamp=time() + wait_time, handler=callback)
insort(self._timers, timer)
def stop(self):
self._running = False
def main():
loop = EventLoop()
def on_stdin_input(line):
if line == 'exit':
loop.stop()
return
n = int(line)
print("fib({}) = {}".format(n, timed_fib(n)))
def print_hello():
print("{} - Hello world!".format(int(time())))
loop.add_timer(3, print_hello)
def f(x):
def g():
print(x)
return g
loop.add_stdin_handler(on_stdin_input)
loop.add_timer(0, print_hello)
loop.run_forever()
if __name__ == '__main__':
main()
This looks quite simple, and in practise, this is the method most commonly used in Node.js code as well. However, in more complicated applications, this style of writing asynchronous code, especially once error handling is added, quite quickly becomes what is known as callback hell. To quote Guido van Rossum on callbacks:
It requires super human discipline to write readable code in callbacks and if you don’t believe me look at any piece of JavaScript code. - Guido van Rossum
There are multiple other alternative approaches to this, such as promises and coroutines (and about a million NPM libraries for each alternative). The one I prefer the most (it's no secret that I think coroutines are very cool!) is using coroutines. Next section goes over implementing a similar event loop that uses coroutines as tasks.
Event Loops With Coroutines
A coroutine is a function that can "return" while still remembering the state in which it is returning (value of local variables, and what the next instruction should be). This will then allow the coroutine to be called again, which results in it continuing from where it left off. This form of "returning" is often called yielding. I go into much more detail on coroutines and their implementation in Python in my combinatorial generation using coroutines article. Below I provide a much quicker introduction to them before we use them in the example.
In Python, the yield
keyword can be used to create coroutines. When
used as a simple statement, such as yield value
, the given value is
yielded, and control is given back to the caller. To continue the coroutine
starting from the instruction after the yield
statement, the caller
needs to use the built-in next
function. When used as an expression,
such as y = yield x
, the value x
is yielded, and to continue
the coroutine, the coroutine's send
method can be used, in which case
the value given to send
will be sent back to the coroutine as the value
returned by the expression (and hence assigned to y
in this example).
This means that we can write our asynchronous code as coroutines, and simply yield when we need to wait on an asynchronous operation. To do this, we simply yield the task or other coroutine whose value we will need to continue. The code will then look very sequential and similar to synchronous code. Here's a simple example of what the Fibonacci portion of our solution will look like:
def read_input():
while True:
line = yield sys.stdin
n = int(line)
print("fib({}) = {}".format(n, timed_fib(n)))
Of course for this to work, we will need an event loop that can handle
coroutines. To achieve this, we will maintain a queue of tasks to be run by the
event loop. When input is available, or a timer goes off (or more generally,
any other event that we care about), we have a list of coroutines that need to
continue (possibly with a value to be sent to them). With each task, we have a
bound stack
variable that keeps track of the stack of coroutines to run
in the chain, each depending on the next to finish. This is based on the
example of a "Trampoline" provided in PEP 342. I also use
functools.partial
as the Python equivalent of Function.prototype.bind
in
JavaScript, namely to Curry a
function by binding parameter values to it.
This is what it would look like:
from bisect import insort
from collections import deque
from collections import namedtuple
from fib import timed_fib
from functools import partial
from time import time
import selectors
import sys
import types
Timer = namedtuple('Timer', ['timestamp', 'handler'])
class sleep_for_seconds(object):
"""
Yield an object of this type from a coroutine to have it "sleep" for the
given number of seconds.
"""
def __init__(self, wait_time):
self._wait_time = wait_time
class EventLoop(object):
"""
Implements a simplified coroutine-based event loop as a demonstration.
Very similar to the "Trampoline" example in PEP 342, with exception
handling taken out for simplicity, and selectors added to handle file IO
"""
def __init__(self, *tasks):
self._running = False
self._selector = selectors.DefaultSelector()
# Queue of functions scheduled to run
self._tasks = deque(tasks)
# (coroutine, stack) pair of tasks waiting for input from stdin
self._tasks_waiting_on_stdin = []
# List of (time_to_run, task) pairs, in sorted order
self._timers = []
# Register for polling stdin for input to read
self._selector.register(sys.stdin, selectors.EVENT_READ)
def resume_task(self, coroutine, value=None, stack=()):
result = coroutine.send(value)
if isinstance(result, types.GeneratorType):
self.schedule(result, None, (coroutine, stack))
elif isinstance(result, sleep_for_seconds):
self.schedule(coroutine, None, stack, time() + result._wait_time)
elif result is sys.stdin:
self._tasks_waiting_on_stdin.append((coroutine, stack))
elif stack:
self.schedule(stack[0], result, stack[1])
def schedule(self, coroutine, value=None, stack=(), when=None):
"""
Schedule a coroutine task to be run, with value to be sent to it, and
stack containing the coroutines that are waiting for the value yielded
by this coroutine.
"""
# Bind the parameters to a function to be scheduled as a function with
# no parameters.
task = partial(self.resume_task, coroutine, value, stack)
if when:
insort(self._timers, Timer(timestamp=when, handler=task))
else:
self._tasks.append(task)
def stop(self):
self._running = False
def do_on_next_tick(self, func, *args, **kwargs):
self._tasks.appendleft(partial(func, *args, **kwargs))
def run_forever(self):
self._running = True
while self._running:
# First check for available IO input
for key, mask in self._selector.select(0):
line = key.fileobj.readline().strip()
for task, stack in self._tasks_waiting_on_stdin:
self.schedule(task, line, stack)
self._tasks_waiting_on_stdin.clear()
# Next, run the next task
if self._tasks:
task = self._tasks.popleft()
task()
# Finally run time scheduled tasks
while self._timers and self._timers[0].timestamp < time():
task = self._timers[0].handler
del self._timers[0]
task()
self._running = False
def print_every(message, interval):
"""
Coroutine task to repeatedly print the message at the given interval
(in seconds)
"""
while True:
print("{} - {}".format(int(time()), message))
yield sleep_for_seconds(interval)
def read_input(loop):
"""
Coroutine task to repeatedly read new lines of input from stdin, treat
the input as a number n, and calculate and display fib(n).
"""
while True:
line = yield sys.stdin
if line == 'exit':
loop.do_on_next_tick(loop.stop)
continue
n = int(line)
print("fib({}) = {}".format(n, timed_fib(n)))
def main():
loop = EventLoop()
hello_task = print_every('Hello world!', 3)
fib_task = read_input(loop)
loop.schedule(hello_task)
loop.schedule(fib_task)
loop.run_forever()
if __name__ == '__main__':
main()
Note that this implementation also lets us add a simple do_on_next_tick
function which more or less does what process.nextTick
does in Node.js. I
use it to implement a simple type exit to quit feature. (Although, I didn't
really have to use do_on_next_tick
, I could have just called
loop.stop()
directly!)
Another interesting thing to point out here is that we can re-implement our recursive Fibonacci algorithm using coroutines instead of recursive calls, and in doing so we can have it run in "parallel" to other coroutines, including the one that prints hello. This is what it would look like:
from event_loop_coroutine import EventLoop
from event_loop_coroutine import print_every
import sys
def fib(n):
if n <= 1:
yield n
else:
a = yield fib(n - 1)
b = yield fib(n - 2)
yield a + b
def read_input(loop):
while True:
line = yield sys.stdin
n = int(line)
fib_n = yield fib(n)
print("fib({}) = {}".format(n, fib_n))
def main():
loop = EventLoop()
hello_task = print_every('Hello world!', 3)
fib_task = read_input(loop)
loop.schedule(hello_task)
loop.schedule(fib_task)
loop.run_forever()
if __name__ == '__main__':
main()
The output for this program will be:
$ python3.4 fib_coroutine.py
1412727829 - Hello world!
1412727832 - Hello world!
28
1412727835 - Hello world!
1412727838 - Hello world!
fib(28) = 317811
1412727841 - Hello world!
1412727844 - Hello world!
Not Reinventing The Wheel
In the previous two sections, we went over the general ideas that go into
implementing an event loop to allow us to write asynchronous code using either
callbacks or coroutines. This was great for the purposes of experimenting and
learning about the ideas but in practise, there are already quite mature
libraries for Python that provide event loops. In addition, Python 3.4 ships
with the asyncio
module that has event loops and coroutines for IO
operations, networking, and more. Let's first solve the above problem using
asyncio
and then look at a few more interesting examples.
import asyncio
import sys
from time import time
from fib import timed_fib
def process_input():
text = sys.stdin.readline()
n = int(text.strip())
print('fib({}) = {}'.format(n, timed_fib(n)))
@asyncio.coroutine
def print_hello():
while True:
print("{} - Hello world!".format(int(time())))
yield from asyncio.sleep(3)
def main():
loop = asyncio.get_event_loop()
loop.add_reader(sys.stdin, process_input)
loop.run_until_complete(print_hello())
if __name__ == '__main__':
main()
Notice how @asyncio.coroutine
is used to decorate coroutines, and
yield from
as opposed to just yield
is used to values from other
coroutines.
Handling Exceptions
Python's coroutine's allow an exception to be thrown in the stack frame of the coroutine, and have it be caught at the point the coroutine has paused. Let's look at a simple example:
def coroutine():
print("Starting")
try:
yield "Let's pause until continued."
print("Continuing")
except Exception as e:
yield "Got an exception: " + str(e)
def main():
c = coroutine()
next(c) # Execute until the first yield
# Now throw an exception at the point where the coroutine has paused
value = c.throw(Exception("Have an exceptional day!"))
print(value)
if __name__ == '__main__':
main()
Which outputs:
Starting
Got an exception: Have an exceptional day!
This makes it rather easy to have one unified way of handling errors using exceptions, in both synchronous and asynchronous code, provided the event loop catches and propagates exceptions properly. For example, let's look at an example with chained coroutines and an event loop:
import asyncio
@asyncio.coroutine
def A():
raise Exception("Something went wrong in A!")
@asyncio.coroutine
def B():
a = yield from A()
yield a + 1
@asyncio.coroutine
def C():
try:
b = yield from B()
print(b)
except Exception as e:
print("C got exception:", e)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(C())
if __name__ == '__main__':
main()
Output:
C got exception: Something went wrong in A!
In this example, coroutine C
relies on the result of B
which in
turn relies on the result of A
, which decides to throw an exception. As
you can see, the exception gets propagated all the way C
, which catches
it and prints the message. As you can see, this behaves almost exactly the same
as synchronous code would. No more catching and passing of errors through
callbacks manually!
Of course, this example is rather theoretical and uninspired. Let's look at a
real example: let's write some code to asynchronously grab the external IP
address of the computer using ipify. Since
asyncio
does not ship with an HTTP client (yet, anyway!) we have to go to
the TCP level and write the HTTP request and parse the response ourselves.
Since we are doing this with a very specific API in mind (and as an example,
not production code!), let's actually go ahead and do this. In practise, using
a library meant for exactly this, for example aiohttp
, is a much better
idea of course. Let's see what this looks like:
import asyncio
import json
host = 'api.ipify.org'
request_headers = {'User-Agent': 'python/3.4',
'Host': host,
'Accept': 'application/json',
'Accept-Charset': 'UTF-8'}
@asyncio.coroutine
def write_headers(writer):
for key, value in request_headers.items():
writer.write((key + ': ' + value + '\r\n').encode())
writer.write(b'\r\n')
yield from writer.drain()
@asyncio.coroutine
def read_headers(reader):
response_headers = {}
while True:
line_bytes = yield from reader.readline()
line = line_bytes.decode().strip()
if not line:
break
key, value = line.split(':', 1)
response_headers[key.strip()] = value.strip()
return response_headers
@asyncio.coroutine
def get_my_ip_address(verbose):
reader, writer = yield from asyncio.open_connection(host, 80)
writer.write(b'GET /?format=json HTTP/1.1\r\n')
yield from write_headers(writer)
status_line = yield from reader.readline()
status_line = status_line.decode().strip()
http_version, status_code, status = status_line.split(' ')
if verbose:
print('Got status {} {}'.format(status_code, status))
response_headers = yield from read_headers(reader)
if verbose:
print('Response headers:')
for key, value in response_headers.items():
print(key + ': ' + value)
# Assume the content length is sent by the server, which is the case
# with ipify
content_length = int(response_headers['Content-Length'])
response_body_bytes = yield from reader.read(content_length)
response_body = response_body_bytes.decode()
response_object = json.loads(response_body)
writer.close()
return response_object['ip']
@asyncio.coroutine
def print_my_ip_address(verbose):
try:
ip_address = yield from get_my_ip_address(verbose)
print("My IP address is:")
print(ip_address)
except Exception as e:
print("Error: ", e)
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(print_my_ip_address(verbose=True))
finally:
loop.close()
if __name__ == '__main__':
main()
Again, notice the similarity to synchronous code: no callbacks, no complicated error handling, just easy and very readable code. Let's see how it works, without any errors:
$ python3.4 ipify.py
Got status 200 OK
Response headers:
Content-Length: 21
Server: Cowboy
Connection: keep-alive
Via: 1.1 vegur
Content-Type: application/json
Date: Fri, 10 Oct 2014 03:46:31 GMT
My IP address is:
<my IP address here, hidden for privacy!>
On the other hand, if something goes wrong, for example if I am not connected to the internet, here is the output:
$ python3.4 ipify.py
Error: [Errno 8] nodename nor servname provided, or not known
This is one of the main advantages to using coroutines for asynchronous code in my opinion: error handling will be perfectly consistent with synchronous code. For example, in the above, it doesn't matter if one of the chained coroutines fails, or if one of the synchronous calls fails, the exception is caught and handled the exact same way.
Relying On The Results of Multiple Independent Coroutines
In the above examples, we wrote asynchronous code that was inherently sequential, meaning each statement in a coroutine relies on the previous statements finishing before continuing. Sometimes, we want to execute a set of independent tasks and use them as completed, without caring about the order they run in. For example, as a web crawler, we might want to send asynchronous requests to all the links on a web page and add the responses to a queue to be processed as we go.
Coroutines allow for writing asynchronous code that flows very sequentially,
but for running independent tasks and processing their results either all at
once or as they come, callbacks may, at first, seem to be better. However,
Python 3.4's asyncio
comes with built-in functions for precisely these
two scenarios, namely functions asyncio.as_completed
and
asyncio.gather
.
Let's look at a simple example in which we need to load three URLs. We do it in
two ways, first by processing the results as they come in using
asyncio.as_completed
, and in the next one only once they have all
finished loading using asyncio.gather
. Instead of actually loading URLs,
I chose to have a simple coroutine that pauses for random number of seconds.
Here's the code:
import asyncio
import random
@asyncio.coroutine
def get_url(url):
wait_time = random.randint(1, 4)
yield from asyncio.sleep(wait_time)
print('Done: URL {} took {}s to get!'.format(url, wait_time))
return url, wait_time
@asyncio.coroutine
def process_as_results_come_in():
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
for coroutine in asyncio.as_completed(coroutines):
url, wait_time = yield from coroutine
print('Coroutine for {} is done'.format(url))
@asyncio.coroutine
def process_once_everything_ready():
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
results = yield from asyncio.gather(*coroutines)
print(results)
def main():
loop = asyncio.get_event_loop()
print("First, process results as they come in:")
loop.run_until_complete(process_as_results_come_in())
print("\nNow, process results once they are all ready:")
loop.run_until_complete(process_once_everything_ready())
if __name__ == '__main__':
main()
And the output:
$ python3.4 gather.py
First, process results as they come in:
Done: URL URL2 took 2s to get!
Coroutine for URL2 is done
Done: URL URL3 took 3s to get!
Coroutine for URL3 is done
Done: URL URL1 took 4s to get!
Coroutine for URL1 is done
Now, process results once they are all ready:
Done: URL URL1 took 1s to get!
Done: URL URL2 took 3s to get!
Done: URL URL3 took 4s to get!
[('URL1', 1), ('URL2', 3), ('URL3', 4)]
Digging Deeper
There is a lot that I didn't cover here; Futures and libuv, to name a few. There is also Guido's talk on asynchronous IO in Python 3.4. There are likely many other resources that I am forgetting to include so feel free to recommend some in the comments below.
The Toptal engineering blog also has a good post on using async in JavaScript titled Asynchronous JavaScript: From Callback Hell to Async and Await.
Comments