Python 3 - An Intro to asyncio

The asyncio module was added to Python in version 3.4 as a provisional package. What that means is that it is possible that asyncio receives backwards incompatible changes or could even be removed in a future release of Python. According to the documentation asyncio "provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives“. This chapter is not meant to cover everything you can do with asyncio, however you will learn how to use the module and why it is useful.

If you need something like asyncio in an older version of Python, then you might want to take a look at Twisted or gevent.


Definitions

The asyncio module provides a framework that revolves around the event loop. An event loop basically waits for something to happen and then acts on the event. It is responsible for handling such things as I/O and system events. Asyncio actually has several loop implementations available to it. The module will default to the one most likely to be the most efficient for the operating system it is running under; however you can explicitly choose the event loop if you so desire. An event loop basically says "when event A happens, react with function B".

Think of a server as it waits for someone to come along and ask for a resource, such as a web page. If the website isn't very popular, the server will be idle for a long time. But when it does get a hit, then the server needs to react. This reaction is known as event handling. When a user loads the web page, the server will check for and call one or more event handlers. Once those event handlers are done, they need to give control back to the event loop. To do this in Python, asyncio uses coroutines.

A coroutine is a special function that can give up control to its caller without losing its state. A coroutine is a consumer and an extension of a generator. One of their big benefits over threads is that they don't use very much memory to execute. Note that when you call a coroutine function, it doesn't actually execute. Instead it will return a coroutine object that you can pass to the event loop to have it executed either immediately or later on.

One other term you will likely run across when you are using the asyncio module is future. A future is basically an object that represents the result of work that hasn't completed. Your event loop can watch future objects and wait for them to finish. When a future finishes, it is set to done. Asyncio also supports locks and semaphores.

The last piece of information I want to mention is the Task. A Task is a wrapper for a coroutine and a subclass of Future. You can even schedule a Task using the event loop.


async and await

The async and await keywords were added in Python 3.5 to define a native coroutine and make them a distinct type when compared with a generator based coroutine. If you'd like an in-depth description of async and await, you will want to check out PEP 492.

In Python 3.4, you would create a coroutine like this:


# Python 3.4 coroutine example
import asyncio

@asyncio.coroutine
def my_coro():
    yield from func()

This decorator still works in Python 3.5, but the types module received an update in the form of a coroutine function which will now tell you if what you're interacting with is a native coroutine or not. Starting in Python 3.5, you can use async def to syntactically define a coroutine function. So the function above would end up looking like this:

import asyncio

async def my_coro():
    await func()

When you define a coroutine in this manner, you cannot use yield inside the coroutine function. Instead it must include a return or await statement that are used for returning values to the caller. Note that the await keyword can only be used inside an async def function.

The async / await keywords can be considered an API to be used for asynchronous programming. The asyncio module is just a framework that happens to use async / await for programming asynchronously. There is actually a project called curio that proves this concept as it is a separate implementation of an event loop thats uses async / await underneath the covers.


A Bad Coroutine Example

While it is certainly helpful to have a lot of background information into how all this works, sometimes you just want to see some examples so you can get a feel for the syntax and how to put things together. So with that in mind, let's start out with a simple example!

A fairly common task that you will want to complete is downloading a file from some location whether that be an internal resource or a file on the Internet. Usually you will want to download more than one file. So let's create a pair of coroutines that can do that:

import asyncio
import os
import urllib.request

async def download_coroutine(url):
    """
    A coroutine to download the specified url
    """
    request = urllib.request.urlopen(url)
    filename = os.path.basename(url)

    with open(filename, 'wb') as file_handle:
        while True:
            chunk = request.read(1024)
            if not chunk:
                break
            file_handle.write(chunk)
    msg = 'Finished downloading {filename}'.format(filename=filename)
    return msg

async def main(urls):
    """
    Creates a group of coroutines and waits for them to finish
    """
    coroutines = [download_coroutine(url) for url in urls]
    completed, pending = await asyncio.wait(coroutines)
    for item in completed:
        print(item.result())


if __name__ == '__main__':
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(urls))
    finally:
        event_loop.close()

In this code, we import the modules that we need and then create our first coroutine using the async syntax. This coroutine is called download_coroutine and it uses Python's urllib to download whatever URL is passed to it. When it is done, it will return a message that says so.

The other coroutine is our main coroutine. It basically takes a list of one or more URLs and queues them up. We use asyncio's wait function to wait for the coroutines to finish. Of course, to actually start the coroutines, they need to be added to the event loop. We do that at the very end where we get an event loop and then call its run_until_complete method. You will note that we pass in the main coroutine to the event loop. This starts running the main coroutine which queues up the second coroutine and gets it going. This is known as a chained coroutine.

The problem with this example is that it really isn't a coroutine at all. The reason is that the download_coroutine function isn't asynchronous. The problem here is that urllib is not asynchronous and further, I am not using await or yield from either. A better way to do this would be to use the aiohttp package. Let's look at that next!


A Better Coroutine Example

The aiohttp package is designed for creating asynchronous HTTP clients and servers. You can install it with pip like this:

pip install aiohttp

Once that's installed, let's update our code to use aiohttp so that we can download the files:

import aiohttp
import asyncio
import async_timeout
import os


async def download_coroutine(session, url):
    with async_timeout.timeout(10):
        async with session.get(url) as response:
            filename = os.path.basename(url)
            with open(filename, 'wb') as f_handle:
                while True:
                    chunk = await response.content.read(1024)
                    if not chunk:
                        break
                    f_handle.write(chunk)
            return await response.release()


async def main(loop):
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]

    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [download_coroutine(session, url) for url in urls]
        await asyncio.gather(*tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

You will notice here that we import a couple of new items: aiohttp and async_timeout. The latter is a actually one of the aiohttp's dependencies and allows us to create a timeout context manager. Let's start at the bottom of the code and work our way up. In the bottom conditional statement, we start our asynchronous event loop and call our main function. In the main function, we create a ClientSession object that we pass on to our download coroutine function for each of the urls we want to download. In the download_coroutine, we create an async_timeout.timeout() context manager that basically creates a timer of X seconds. When the seconds run out, the context manager ends or times out. In this case, the timeout is 10 seconds. Next we call our session's get() method which gives us a response object. Now we get to the part that is a bit magical. When you use the content attribute of the response object, it returns an instance of aiohttp.StreamReader which allows us to download the file in chunks of whatever size we'd like. As we read the file, we write it out to local disk. Finally we call the response's release() method, which will finish the response processing.

According to aiohttp's documentation, because the response object was created in a context manager, it technically calls release() implicitly. But in Python, explicit is usually better and there is a note in the documentation that we shouldn't rely on the connection just going away, so I believe that it's better to just release it in this case.

There is one part that is still blocking here and that is the portion of the code that actually writes to disk. While we are writing the file, we are still blocking. There is another library called aiofiles that we could use to try and make the file writing asynchronous too, but I will leave that update to the reader.


Scheduling Calls

You can also schedule calls to regular functions using the asyncio event loop. The first method we'll look at is call_soon. The call_soon method will basically call your callback or event handler as soon as it can. It works as a FIFO queue, so if some of the callbacks take a while to run, then the others will be delayed until the previous ones have finished. Let's look at an example:

import asyncio
import functools


def event_handler(loop, stop=False):
    print('Event handler called')
    if stop:
        print('stopping the loop')
        loop.stop()
    
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.call_soon(functools.partial(event_handler, loop))
        print('starting event loop')
        loop.call_soon(functools.partial(event_handler, loop, stop=True))
        
        loop.run_forever()
    finally:
        print('closing event loop')
        loop.close() 

The majority of asyncio's functions do not accept keywords, so we will need the functools module if we need to pass keywords to our event handler. Our regular function will print some text out to stdout whenever it is called. If you happen to set its stop argument to True, it will also stop the event loop.

The first time we call it, we do not stop the loop. The second time we call it, we do stop the loop. The reason we want to stop the loop is that we've told it to run_forever, which will put the event loop into an infinite loop. Once the loop is stopped, we can close it. If you run this code, you should see the following output:

starting event loop
Event handler called
Event handler called
stopping the loop
closing event loop

There is a related function called call_soon_threadsafe. As the name implies, it works the same way as call_soon, but it's thread-safe.

If you want to actually delay a call until some time in the future, you can do so using the call_later function. In this case, we could change our call_soon signature to the following:

loop.call_later(1, event_handler, loop)

This will delay calling our event handler for one second, then it will call it and pass the loop in as its first parameter.

If you want to schedule a specific time in the future, then you will need to grab the loop's time rather than the computer's time. You can do so like this:

current_time = loop.time()

Once you have that, then you can just use the call_at function and pass it the time that you want it to call your event handler. So let's say we want to call our event handler five minutes from now. Here's how you might do it:

loop.call_at(current_time + 300, event_handler, loop)

In this example, we use the current time that we grabbed and append 300 seconds or five minutes to it. By so doing, we delay calling our event handler for five minutes! Pretty neat!


Tasks

Tasks are a subclass of a Future and a wrapper around a coroutine. They give you the ability to keep track of when they finish processing. Because they are a type of Future, other coroutines can wait for a task and you can also grab the result of a task when it's done processing. Let's take a look at a simple example:

import asyncio


async def my_task(seconds):
    """
    A task to do for a number of seconds
    """
    print('This task is taking {} seconds to complete'.format(
        seconds))
    await asyncio.sleep(seconds)
    return 'task finished'
    
    
if __name__ == '__main__':
    my_event_loop = asyncio.get_event_loop()
    try:
        print('task creation started')
        task_obj = my_event_loop.create_task(my_task(seconds=2))
        my_event_loop.run_until_complete(task_obj)
    finally:
        my_event_loop.close()
        
    print("The task's result was: {}".format(task_obj.result()))

Here we create an asynchronous function that accepts the number of seconds it will take for the function to run. This simulates a long running process. Then we create our event loop and then create a task object by calling the event loop object's create_task function. The create_task function accepts the function that we want to turn into a task. Then we tell the event loop to run until the task completes. At the very end, we get the result of the task since it has finished.

Tasks can also be canceled very easily by using their cancel method. Just call it when you want to end a task. Should a task get canceled when it is waiting for another operation, the task will raise a CancelledError.


Wrapping Up

At this point, you should know enough to start working with the asyncio library on your own. The asyncio library is very powerful and allows you to do a lot of really cool and interesting tasks. The Python documentation is a great place to start learning the asyncio library.

UPDATE: This article was recently translated into Russian here.


Related Reading

Copyright © 2024 Mouse Vs Python | Powered by Pythonlibrary