python asyncio

There are many network models, in order to achieve high concurrency there are also many schemes, multi-threading, multi-process. Whether multithreaded or multiprocess, the scheduling of IO depends more on the system, whereas in the way of coroutines, scheduling comes from the user, who can yield a state in a function. Efficient concurrent tasks can be achieved using coroutines. Python introduced the concept of coroutines in 3.4, but this is still based on generator objects, while 3.5 defines the syntax of coroutines. Here is a brief introduction to asyncio usage. It is not only Asyncio that implements coroutines, Tornado and GEvent both implement similar functions.

  • Event_loop: The program opens an infinite loop to which the programmer registers functions. When a satisfying event occurs, the corresponding coroutine function is called.

  • Coroutine: coroutine object. A function defined using the async keyword whose call does not execute the function immediately but returns a coroutine object. Coroutine objects need to be registered with the event loop to be called by the event loop.

  • Task: A coroutine object is a native function that can be suspended, and a task is a further encapsulation of the coroutine that contains the various states of the task.

  • Future: Represents the result of a task that will or will not be executed in the future. It is not fundamentally different from task

  • Async /await keyword: PYTHon3.5 keyword used to define a coroutine, async defines a coroutine, and await is used to suspend the blocking asynchronous invocation interface.

The above concepts are not easy to understand by themselves. They are interrelated and work together. Let’s look at some examples and then go back to the above concepts to make it easier to understand.

Define a coroutine

Defining a coroutine is simple, using the async keyword, just like defining a normal function:

import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

print('TIME: ', now() - start)

Copy the code

The async keyword defines a coroutine, which is also an object. Coroutines cannot be run directly; they need to be added to the event loop (loop), which calls the coroutine at the appropriate time. The asyncio.get_event_loop method creates an event loop, then registers the coroutine to the event loop with run_until_complete and starts the event loop. Since this example has only one coroutine, you can see the following output:

Waiting:  2
TIME:  0.0004658699035644531
Copy the code

Creating a Task

The coroutine object does not run directly, but it is the run_until_complete method that wraps the coroutine as a task object when the event loop is registered. Task objects are subclasses of the Future class. Saves the state of the coroutine after it is run for future retrieval of coroutine results.

import asyncio
import time

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)
Copy the code

You can see that the output is:

< Task pending coro = < do_some_work () running at/Users/ghost/Rsj217 / python3.6 / async/async - main. Py: 17 > > Waiting: 2 <Task finished coro=<do_some_work() done, Defined the at/Users/ghost/Rsj217 / python3.6 / async/async - main. Py: 17 > result = None > TIME: 0.0003490447998046875Copy the code

After the task is created, the task is in a pending state before it is added to the event loop, because there are no time-consuming blocking operations in do_some_work and the task completes quickly. Specifies the FINISHED state.

Asyncio.ensure_future (coroutine) and loop.create_task(coroutine) can both create a task, and the argument to run_until_complete is a futrue object. When a coroutine is passed in, it is automatically encapsulated internally as a Task, which is a subclass of Future. Isinstance (task, asyncio.future) will print True.

Bind a callback

The last parameter of the callback is the Future object from which the coroutine return value can be retrieved. If the callback requires more than one argument, it can be imported via a partial function.

import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)

def callback(future):
    print('Callback: ', future.result())

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

print('TIME: ', now() - start)

Copy the code
def callback(t, future):
    print('Callback:', t, future.result())

task.add_done_callback(functools.partial(callback, 2))
Copy the code

As you can see, the callback function is called when the coroutine execution ends. The result of coroutine execution is obtained with the parameter future. The task we created and the future object in the callback are actually the same object.

The future with the result

Callbacks have been the nightmare of many asynchronous programs, and programmers prefer to write asynchronous code synchronously to avoid the nightmare of callbacks. We use the Result method of the Future object in the callback. In the previous example without a bound callback, we can see that the Task has a fiinished state. At that point, the Result method of the task could be read directly.

async def do_some_work(x):
    print('Waiting {}'.format(x))
    return 'Done after {}s'.format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print('Task ret: {}'.format(task.result()))
print('TIME: {}'.format(now() - start))
Copy the code

You can see the output:

Waiting: 2 Task RET: Done after 2s TIME: 0.0003650188446044922 Waiting: 2 Task RET: Done after 2s TIME: 0.0003650188446044922Copy the code

Blocking and await

With async you can define coroutine objects, and with await you can suspend for time-consuming operations, just like yield in generators, where the function cedes control. If a coroutine encounters an await, the event loop will suspend the coroutine and execute other coroutines until the other coroutines are also suspended or completed before the next coroutine is executed.

Time-consuming operations are usually IO operations, such as network requests and file reads. We use asyncio.sleep to simulate IO operations. The purpose of coroutines is also to make these IO operations asynchronous.

import asyncio
import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print('Task ret: ', task.result())
print('TIME: ', now() - start)  
Copy the code

While sleeping, use await to relinquish control. That is, when a blocking function is encountered, use the await method to transfer control of the coroutine so that the loop can call the other coroutine. Now our example uses time-consuming blocking operations.

Concurrency and parallelism

Concurrency and parallelism have always been confusing concepts. Concurrency usually refers to multiple tasks that need to be executed at the same time, while parallelism refers to multiple tasks that need to be executed at the same time. Taking classes, for example, is a concurrent situation where the same teacher helps different people with their work at the same time. Parallel is when several teachers work with several students at the same time. In short, one person eats three steamed buns at the same time or three people respectively eat one at the same time. Eating one steamed bread counts as a task.

Asyncio achieves concurrency by requiring multiple coroutines to complete the task, await it whenever a task is blocked, and then the other coroutines continue to work. Create a list of coroutines, and then register them with the event loop.

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task ret: ', task.result())

print('TIME: ', now() - start)

Copy the code

The results are as follows

Waiting:  1
Waiting:  2
Waiting:  4
Task ret:  Done after 1s
Task ret:  Done after 2s
Task ret:  Done after 4s
TIME:  4.003541946411133

Copy the code

The total time is about 4s. The blocking time of 4s is enough to complete the execution of the first two coroutines. For synchronous sequential tasks, it takes at least 7 seconds. At this point we use Aysncio to achieve concurrency. Asyncio. wait(tasks) can also be used asyncio.gather(*tasks), the former receives a list of tasks, and the latter receives a collection of tasks.

Coroutines nesting

We can use async to define coroutines, which are used for time-consuming IO operations. We can also encapsulate more IO operations, thus implementing nested coroutines, that is, one coroutine is await another coroutine, and thus connected.

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    dones, pendings = await asyncio.wait(tasks)

    for task in dones:
        print('Task ret: ', task.result())

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', now() - start)

Copy the code

If asyncio. Gather is used to create coroutine objects, the return value of await is the result of the coroutine being run.

    results = await asyncio.gather(*tasks)

    for result in results:
        print('Task ret: ', result)
Copy the code

The outermost run_until_complete will return the result of the main coroutine instead of processing the result in the main coroutine function and returning the contents of the await object.

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
    print('Task ret: ', result)

Copy the code

Or return to suspend the coroutine using asyncio.wait.

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks) start = now() loop = asyncio.get_event_loop() done,  pending = loop.run_until_complete(main()) for task in done: print('Task ret: ', task.result())Copy the code

You can also use the AS_COMPLETED method of Asyncio

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)

Copy the code

Therefore, the call and combination of coroutines are very flexible, especially for the processing of results, how to return, how to suspend, need to gradually accumulate experience and forward-looking design.

Coroutines stop

We’ve seen several common uses of coroutines, all of which operate around event loops. The Future object has several states:

  • Pending
  • Running
  • Done
  • Cancelled

When creating a future, the task is set as pending, and when the event loop is called, it is running. When the event loop is called, it is done. If you need to stop the event loop, you need to cancel the task first. You can use asyncio.task to get the Task of the event loop

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print('TIME: ', now() - start)

Copy the code

Immediately after starting the event loop, CTRL + C triggers the execution exception KeyBorardInterrupt for RUN_Until_complete. Then cancel the future with a loop asyncio.task. You can see the output below:

Waiting: 1 Waiting: 2 Waiting: {< Task 2 pending coro = < do_some_work () running at/Users/ghost/Rsj217 / python3.6 / async/async - main. Py: 18 > wait_for = < the Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at / Library/Frameworks/Python framework Versions / 3.6 / lib/python3.6 / asyncio/tasks. The py: 374] >, < Task pending coro = < do_some_work () running at/Users/ghost/Rsj217 / python3.6 / async/async - main. Py: 18 > wait_for = < the Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at / Library/Frameworks/Python framework Versions / 3.6 / lib/python3.6 / asyncio/tasks. The py: 374] >, <Task pending coro=<wait() running at / Library/Frameworks/Python framework Versions / 3.6 / lib/python3.6 / asyncio/tasks. The py: 307 > wait_for = < Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at / Library/Frameworks/Python framework Versions / 3.6 / lib/python3.6 / asyncio/base_events py: 176] >, < Task pending coro = < do_some_work () running at/Users/ghost/Rsj217 / python3.6 / async/async - main. Py: 18 > wait_for = < the Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at / Library/Frameworks/Python framework Versions / 3.6 / lib/python3.6 / asyncio/tasks. The py: 374] >} True True True True TIME: 0.8858370780944824Copy the code

If cannel succeeds, loop stop must start the event loop again, and then close, otherwise an exception will be raised:

Task was destroyed but it is pending!
task: <Task pending coro=<do_some_work() done,
Copy the code

Looping tasks, cancel one by one, is one option, but as above we wrap the list of tasks in main and call the event loop outside main. At this point, main is equivalent to the most external task, so the wrapped main function can be handled.

import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(2) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] done, pending = await asyncio.wait(tasks) for task in done: print('Task ret: ', task.result()) start = now() loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever()  finally: loop.close()Copy the code

Event loops for different threads

Many times, our event loop is used to register coroutines, and some coroutines need to be added dynamically to the event loop. An easy way to do this is to use multiple threads. The current thread creates an event loop, then starts the event loop in a new thread. The current thread will not be blocked.

from threading import Thread

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

Copy the code

After starting the above code, the current thread is not blocked, and the new thread executes the more_work method registered by call_soon_threadsafe sequentially. The latter method blocks synchronously, so more_work takes approximately 6 + 3 to complete

New thread coroutine

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

Copy the code

In the example above, a new_loop is created in the main thread and an infinite event loop is opened in the other child threads. The main thread registers new coroutine objects with run_coroutine_threadsafe. This allows concurrent operation of the event loop in child threads without blocking the main thread. The total execution time is about 6s.

Master-worker master/slave mode

For concurrent tasks, the generation consumption model is usually adopted. The processing of queues can be similar to master-worker, where the master main user obtains MSG of queues and the worker user processes messages.

For simplicity, and because coroutines are more suitable for the single-threaded approach, our main thread is used to listen to the queue and our child threads are used to process the queue. The redis queue is used here. One of the main threads is an infinite loop, a user consumption queue.

    while True:
        task = rcon.rpop("queue")
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
Copy the code

Add some data to the queue:

127.0.0.1:6379[3]> lpush queue 2
(integer) 1
127.0.0.1:6379[3]> lpush queue 5
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
(integer) 1
127.0.0.1:6379[3]> lpush queue 1
Copy the code

You can see the output:

Waiting  2
Done 2
Waiting  5
Waiting  1
Done 1
Waiting  1
Done 1
Done 5
Copy the code

We initiated an operation for 5 seconds and then another operation for 1 second. As a result, the child thread executed the tasks concurrently. During the 5 awati, the child thread executed two tasks for 1 second.

Stop child thread

If everything works, the above example is perfect. However, to stop the program, CTRL + C raises KeyboardInterrupt. Let’s change the main loop:

try:
    while True:
        task = rcon.rpop("queue")
        if not task:
            time.sleep(1)
            continue
        asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except KeyboardInterrupt as e:
    print(e)
    new_loop.stop()

Copy the code

This doesn’t work. Even though the main thread tries the KeyboardInterrupt exception, the child thread does not exit. To solve this problem, you can set the child thread as a daemon thread, so that when the main thread terminates, the child thread will exit at random.

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, Args =(new_loop,)) t.start() try: while True: # print('start rpop') task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e: print(e) new_loop.stop()Copy the code

When a thread stops a program, the main thread exits, and the child thread exits at random, and stops the child thread’s coroutine task.

aiohttp

When consuming the queue, we use asyncio’s sleep to simulate time-consuming IO operations. There was a time when an SMS service needed to request a remote SMS API in a coroutine, and it needed to use AIOHTTP for asynchronous HTTP requests. The general code is as follows:

server.py

import time
from flask import Flask, request

app = Flask(__name__)

@app.route('/<int:x>')
def index(x):
    time.sleep(x)
    return "{} It works".format(x)

@app.route('/error')
def error():
    time.sleep(3)
    return "error!"

if __name__ == '__main__':
    app.run(debug=True)
Copy the code

/ indicates the SMS interface, and /error indicates the alarm after the request/failure.

async-custoimer.py

import time import asyncio from threading import Thread import redis import aiohttp def get_redis(): = '127.0.0.1 connection_pool = redis. ConnectionPool (host', db=3) return redis.Redis(connection_pool=connection_pool) rcon = get_redis() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(resp.status) return await resp.text() async def do_some_work(x): Print ('Waiting ', x) try: ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x)) print(ret) except Exception as e: Try: print (await the fetch (url = 'http://127.0.0.1:5000/error')) except the Exception as e: print (e) the else: print('Done {}'.format(x)) new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) t.start() try: while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except Exception as e: print('error') new_loop.stop() finally: passCopy the code

One thing to note is that we try an exception in the fetch. If there is no try exception, the child thread’s event loop will not exit even if an exception occurs. The main thread will not exit. There is no way to propagate an exception raised from the child thread to the main thread. (If anyone found a better way, I hope you can take me).

There is also a block method for redis consumption:

try:
    while True:
        _, task = rcon.brpop("queue")
        asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)
except Exception as e:
    print('error', e)
    new_loop.stop()
finally:
    pass
Copy the code

With the BRPOP method, the task is blocked and consumed only if the main thread has a message. After testing, it seems that the BRPOP approach is more suitable for this queued consumption model.

127.0.0.1:6379[3]> Lpush queue 5 (integer) 1 127.0.0.1:6379[3]> Lpush queue 1 (integer) 1 127.0.0.1:6379[3]> Lpush queue 5 (integer) 1 127.0.0.1:6379[3]> Lpush queue 1  1Copy the code

You can see the result

Waiting  5
Waiting  1
Waiting  1
200
1 It works
Done 1
200
1 It works
Done 1
200
5 It works
Done 5
Copy the code

Coroutines consumption

The main thread is used to listen to the queue, and the worker of the child thread is one way to do the event loop. There is another way to implement this masterworker-like scheme. That is to coroutine an infinite loop of listening queue logic. Program initialization creates a number of coroutines to achieve a parallelism effect.

import time import asyncio import redis now = lambda : time.time() def get_redis(): = '127.0.0.1 connection_pool = redis. ConnectionPool (host', db=3) return redis.Redis(connection_pool=connection_pool) rcon = get_redis() async def worker(): print('Start worker') while True: start = now() task = rcon.rpop("queue") if not task: await asyncio.sleep(1) continue print('Wait ', int(task)) await asyncio.sleep(int(task)) print('Done ', task, now() - start) def main(): asyncio.ensure_future(worker()) asyncio.ensure_future(worker()) loop = asyncio.get_event_loop() try: loop.run_forever() except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() finally: loop.close() if __name__ == '__main__': main()Copy the code

By doing so, you can start several more workers to listen to the queue. Same can achieve the effect.

conclusion

The usage of asyncio is introduced briefly above, mainly to understand the relationship between event loops, coroutines and tasks, and futures. Asynchronous programming is different from common synchronous programming in that special attention needs to be paid to designing the execution flow of a program. After all, it’s a little different from previous coding experience. But if you think about it, our brains naturally implement asynchronous coroutines when we’re doing things. You can write a few more lines of code while you’re waiting to make tea.

Related code files for Gist

Threaded Asynchronous Magic and How to Wield It