1. Introduction

When performing IO intensive tasks, programs often get blocked waiting for IO. For example, in the web crawler, if we use the Requests library to make requests, if the response speed of the website is too slow, the program keeps waiting for the response of the website, and finally, the crawl efficiency is very, very low.

To solve these problems, this article explores Python’s asynchronous coroutine approach to acceleration, which is very effective for IO intensive tasks. If it is applied to the web crawler, the crawling efficiency can even be improved a hundred times.

Note: This coroutine is implemented using async/await and requires Python 3.5 or later.

2. Basic understanding

Before we look at asynchronous coroutines, we need to understand some basic concepts, such as blocking and non-blocking, synchronous and asynchronous, multi-process and coroutine.

2.1 the blocked

A blocked state is the state in which a program is suspended if it does not get the computing resources it needs. A program is said to block on an operation while it is waiting for it to complete and cannot continue to do anything else.

Common blocking modes include network I/O, disk I/O, and user input blocking. Blocking is everywhere, including when the CPU switches context, and all processes can’t really do anything, they can also block. If it is a multi-core CPU, the core that is performing a context switch operation is not available.

2.2 a non-blocking

A program is said to be nonblocking if it does not block while waiting for an operation and can continue to do something else.

Non-blocking does not exist at any program level and under any circumstances. A program can be nonblocking only if the level of encapsulation includes individual subroutine units.

Non-blocking exists because blocking exists, and it is because of the time and inefficiency of a blocking operation that we make it non-blocking.

2.3 the synchronization

In order to complete a task, different program units need to rely on a certain communication mode to coordinate in the execution process, said that these program units are synchronous execution.

For example, in the shopping system, it is necessary to use “line lock” as a communication signal to force different update requests to be executed in queue order, so that the operation of updating inventory is synchronous.

In short, synchronization means order.

2.4 the asynchronous

In order to accomplish a task, different program units can accomplish the task without communication and coordination, and unrelated program units can be asynchronous.

For example, a crawler downloads a web page. After the scheduler calls the downloader, it can schedule other tasks without having to communicate with the downloader to coordinate behavior. Downloading, saving and other operations of different web pages are irrelevant, and there is no need to notify each other and coordinate. The completion time of these asynchronous operations is uncertain.

In short, asynchrony means disorder.

2.5 multiple processes

Multi-process is to take advantage of CPU’s multi-core advantage to execute multiple tasks in parallel at the same time, which can greatly improve the execution efficiency.

2.6 coroutines

Coroutine, English called Coroutine, also known as micro thread, fiber, Coroutine is a user – mode lightweight thread.

Coroutines have their own register context and stack. When coroutine schedules a switch, the register context and stack are saved elsewhere, and when cut back, the previously saved register context and stack are restored. Thus, the coroutine can preserve the state of the last call, that is, a specific combination of all local states, and each procedure reentry is equivalent to entering the state of the last call.

Coroutines are essentially single processes. Compared with multiple processes, coroutines do not need the overhead of thread context switch, atomic operation locking and synchronization, and the programming model is very simple.

We can use it to implement asynchronous operations, such as the web crawler scenarios, we send a request, needs to wait for some time to get a response, but in the process of the waiting, the program can do many other things, wait until after the response is to switch back to continue processing, so that we can make full use of the CPU and other resources, That’s the advantage of asynchronous coroutines.

3. Asynchronous coroutine usage

Let’s look at the implementation of coroutines. Since Python 3.4, the concept of coroutines has been introduced in Python, but this version is still based on generator objects. In Python 3.5, async/await has been added to make coroutines easier to implement.

The most common Python library for using coroutines is asyncio, so this article will use asyncio as the basis for using coroutines.

First we need to understand the following concepts:

  • Event_loop: an infinite loop. You can register functions to this event loop and call the corresponding processing method when a condition is met.

  • Coroutine: A coroutine object can be registered in a time loop to be called by an event loop. We can use the async keyword to define a method that does not execute immediately when called, but instead returns a coroutine object.

  • Task: a further encapsulation of the coroutine object, which contains the states of the task.

  • Future: represents the result of a task that will or will not be executed in the future. It is essentially the same as task.

We also need to know about the async/await keyword, which only came out of Python 3.5 and is specifically used to define coroutines. Where async defines a coroutine and await is used to suspend execution of the blocking method.

3.1 Defining coroutines

First let’s define a coroutine, and experience the difference between it and the ordinary process in the implementation, the code is as follows:

import asyncio

async def execute(x):
    print('Number:', x)

coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
Copy the code

Running results:

Coroutine: <coroutine object execute at 0x1034cf830>
After calling execute
Number: 1
After calling loop
Copy the code

First we introduce the asyncio package so that we can use async and await, and then we use async to define a execute() method that takes a number and prints that number when it executes.

We then call this method directly, but instead of executing, it returns a Coroutine object. We then create an event loop loop using the get_event_loop() method, register the coroutine with the event loop by calling the run_until_complete() method of the loop object, and start it. Finally, we see that the execute() method prints the output.

As you can see, the async defined method becomes a Coroutine object that cannot be executed directly and must be registered with the event loop to execute.

Task encapsulates the coroutine object and contains more running states (running, FINISHED, etc.) than coroutine objects. You can use these states to obtain the execution status of coroutine objects.

In the above example, when we pass the coroutine object to the run_until_complete() method, it actually does one thing: encapsulate the coroutine as a task, which we can also declare explicitly, as follows:

import asyncio

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
Copy the code

Running results:

Coroutine: <coroutine object execute at 0x10e0f7830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop
Copy the code

Here we define the loop object, then call its create_task() method to convert the coroutine object to a task, then print it out and find that it is in a pending state. Then we added the Task to the event loop for execution, and we printed out the Task and saw that its state had changed to FINISHED and that its result had changed to 1, which is the result of the execute() method we defined.

Another way to define a task is directly through asyncio’s ensure_future() method, which returns a task instead of loop. Even if we haven’t declared loop yet, we can define our task in advance as follows:

import asyncio

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
Copy the code

Running results:

Coroutine: <coroutine object execute at 0x10aa33830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop
Copy the code

And found that the effect was the same.

3.2 Binding callback

We can also bind a callback method to a task, as shown in the following example:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

def callback(task):
    print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
Copy the code

Here we define a request() method that asks Baidu to return a status code, but we don’t have any print() statements in this method. We then define a callback() method that takes an argument, which is the Task, and calls the print() method to print the result of the Task. Now that we have a Coroutine object and a callback method defined, we want the declared callback() method to be executed when the Coroutine object is finished executing.

So how do they relate to each other? Simply call the add_done_callback() method. We pass the callback() method to the wrapped Task so that the callback() method can be called when the task completes execution. The task is also passed as an argument to the callback() method, which calls the result() method of the Task to retrieve the result.

Running results:

Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>
Copy the code

Instead of calling the callback method, you can call the result() method directly after the task has finished, as shown below:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())
Copy the code

The result is the same:

Task: <Task pending coro=<request() running at demo.py:4>>
Task: <Task finished coro=<request() done, defined at demo.py:4> result=<Response [200]>>
Task Result: <Response [200]>
Copy the code

3.3 Multitasking coroutines

In the example above we only executed the request once. What if we want to execute the request multiple times? We can define a list of tasks that can then be executed using asyncio’s wait() method, as shown in the following example:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())
Copy the code

Here we use a for loop to create a list of five tasks, which are first passed to asyncio’s wait() method and then registered to the time loop to initiate five tasks. Finally, we output the running results of the task as follows:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Copy the code

You can see that five tasks are executed sequentially and the results are obtained.

3.4 Coroutine implementation

Async, Coroutine, Task, callback, etc. Don’t worry, the above case is just for the later use of fovea, let’s take a formal look at the coroutine in solving IO intensive tasks have what advantages!

In the code above, we use a network request as an example, which is a time-consuming wait operation because we request a web page and then wait for the page to respond and return the result. Time-consuming wait operations are usually IO operations, such as file reads, network requests, and so on. The coroutine has a great advantage in handling such operations. When there is a need to wait, the program can be suspended temporarily to execute other operations, so as to avoid waiting for a program and consume too much time and make full use of resources.

In order to show the advantages of coroutines, we need to create a suitable experimental environment first. The best method is to simulate a web page that needs to wait for a certain amount of time to get the returned results. Baidu is used in the above code, but baidu’s response is too fast, and the response speed will be affected by the local network speed. So the best way is to simulate a slow server locally, and we choose Flask here.

If Flask is not installed, run the following command to install it:

pip3 install flask
Copy the code

Then write the server code as follows:

from flask import Flask
import time

app = Flask(__name__)

@app.route('/')
def index():
    time.sleep(3)
    return 'Hello! '

if __name__ == '__main__':
    app.run(threaded=True)
Copy the code

Here we define a Flask service, where the main entry is index(), which calls the sleep() method for 3 seconds, and then returns the result, which means that each request for this interface takes at least 3 seconds, so we simulate a slow service interface.

Note that the run() method is threaded. This indicates that Flask is threaded, otherwise there is only one thread by default. If the multi-threaded mode is not enabled, multiple requests can only be processed in sequence at the same time, so that even if we use the coroutine asynchronously request the service, we can only queue up one by one, and the bottleneck will appear on the server side. Therefore, multithreaded mode is necessary to turn on.

Flask should be running on 127.0.0.1:5000 by default when started, and the console output is as follows:

 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Copy the code

Let’s use the above method again:

import asyncio
import requests
import time

start = time.time()

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'Result:', response.text)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)
Copy the code

Here again we create five tasks, pass the list of tasks to the wait() method and register it for execution in the time loop.

The running results are as follows:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.049368143081665
Copy the code

You can find that there is no difference with the normal request, it is still sequential execution, takes 15 seconds, the average time for a request is 3 seconds, the agreed asynchronous processing?

In fact, in order to realize asynchronous processing, we have to be suspended operation, when a task needs to wait for the IO results, can hang the current task, to perform other tasks, so that we can make full use of resources, the above methods are serious walking down, doesn’t even have a hang, how is it possible to realize asynchronous? Too much thinking.

To implement asynchrony, let’s look at the use of await. To use await, we can suspend an operation that takes time to wait and give up control. When an await is encountered while the coroutine is executing, the time loop suspends the coroutine and executes another coroutine until the other coroutine suspends or executes.

So, we might change the request() method in our code to something like this:

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await requests.get(url)
    print('Get response from', url, 'Result:', response.text)
Copy the code

We simply added an await to requests, but we would get an error if we executed the following code:

Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Cost time: 15.048935890197754
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
  File "demo.py", line 10, in request
    status = await requests.get(url)
TypeError: object Response can't be used in 'await' expression
Copy the code

This time it encountered an await method that did await it, but ended up reporting an error. This error means that the Response object returned from Requests cannot be used with await. Because according to the official documentation, the object after await must be in one of the following formats:

  • A native Coroutine object returned from A native Coroutine function.

  • A generator-based Coroutine object returned from A function decorated with types. Coroutine (), A generator decorated with types.coroutine() that returns a Coroutine object.

  • An object with An await__ method returning An iterator An object with An await__ method returning An iterator

You can see: https://www.python.org/dev/peps/pep-0492/#await-expression.

The Response returned by the Reqeusts does not meet any of the above criteria, so it will report the error.

I can use async to change the method of the request to a Coroutine object. So rewrite it like this:

import asyncio
import requests
import time

start = time.time()

async def get(url):
    return requests.get(url)

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'Result:', response.text)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)
Copy the code

Here we separate the method that requests the page and decorate it with async. This gives us a Coroutine object. Let’s run it:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447
Copy the code

No, it’s not asynchronous yet, which means we can’t just wrap code that involves IO operations in async decorated methods! To be truly asynchronous, we must use requests that support asynchronous operations, so that’s where AIOHTTP comes in.

3.5 use aiohttp

Aiohttp is a library that supports asynchronous request, and we can use it to cooperate with Asyncio to achieve asynchronous request operation very easily.

The installation method is as follows:

pip3 install aiohttp
Copy the code

Is the official document links: https://aiohttp.readthedocs.io/, it is divided into two parts, one part is the Client, part of the Server, the detailed content you can refer to the official document.

Let’s use aiohttp and change the code to look like this:

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = await get(url)
    print('Get response from', url, 'Result:', result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)
Copy the code

Here we change the request library from Requests to AIoHTTP, using the get() method of the ClientSession class of AIoHTTP, resulting in the following:

Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 3.0199508666992188
Copy the code

Success! We found that the request took 3 seconds instead of 15 seconds, which is about 1/5 of the original time.

In the code, we use await followed by get() method. When executing these five coroutines, if we encounter await, we will suspend the current coroutine and execute other coroutines. The next coroutine will be executed until the other coroutines are also suspended or executed.

For the first task, it is suspended when the get() method followed by the first await is executed, but the first step of the get() method is non-blocking, it is woken up immediately after being suspended, so it immediately enters execution again. Create ClientSession object, then get second await, call session.get() request method, then suspend, because request takes a long time, so do not wake up, first task is suspended, what should I do next? The event loop looks for any coroutines that are not currently pending and executes the second task, which is the same process until the fifth task’s session.get() method is executed and all tasks are suspended. All tasks are already pending, so what should I do? We’ll just have to wait. After 3 seconds, several requests are answered almost at the same time, and several tasks are woken up to execute and output the results of the request. Finally, 3 seconds!

How’s that? This is where asynchronous operations come in handy. When a blocking operation is encountered, the task is suspended and the program moves on to other tasks instead of waiting for something to happen, which makes good use of CPU time instead of wasting time waiting for IO.

One might say, well, in the example above, after making the network request, since the next three seconds are waiting, in three seconds, the CPU can process a lot more tasks than that, Don’t we put 10, 20, 50, 100, 1000 tasks together and get all the results in about 3 seconds? Because these tasks are all waiting together after being suspended.

Theoretically speaking, this is true, but there is a premise, that is, the server can receive unlimited requests at the same time can guarantee normal return results, that is, the server unlimited pressure, in addition to ignoring THE IO transmission delay, indeed can achieve infinite tasks together and get results in the expected time.

Let’s set the number of tasks to 100 and try again:

tasks = [asyncio.ensure_future(request()) for _ in range(100)]
Copy the code

The time consuming results are as follows:

Cost time: 3.106252670288086
Copy the code

The final running time is also about 3 seconds, of course, the extra time is the IO delay.

It can be seen that after using the asynchronous coroutine, we can almost realize hundreds or thousands of network requests in the same time, which can be used in the crawler, the speed is very considerable.

3.6 Comparison with single-process and multi-process

If you want to know how much time it takes to use a single process or multiple processes in the example above, if you do not use an asynchronous coroutine, let’s test it:

First let’s test the time of the order process:

import requests
import time

start = time.time()

def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)

for _ in range(100):
    request()

end = time.time()
print('Cost time:', end - start)
Copy the code

Last time:

Cost time: 305.16639709472656
Copy the code

Let’s test with multiple processes, using the multiprocessing library:

import requests
import time
import multiprocessing

start = time.time()

def request(_):
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)

cpu_count = multiprocessing.cpu_count()
print('Cpu count:', cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(request, range(100))

end = time.time()
print('Cost time:', end - start)
Copy the code

Here I use the Pool class from MultiProcessing, the process Pool. My computer has 8 cpus, so the process pool here is 8.

Running time:

Cost time: 48.17306900024414
Copy the code

So multiprocessing is much more efficient than a single thread.

3.7 Combination with multiple processes

Since both asynchronous coroutines and multi-processes improve network requests, why not combine the two? At PyCon 2018, John Reese from Facebook introduced the features of Asyncio and Multiprocessing, and developed a new library called AiomultiProcess. For those who are interested: https://www.youtube.com/watch?v=0kXaLh8Fz3k.

The library is installed as follows:

pip3 install aiomultiprocess
Copy the code

Python 3.6 or later is required.

Using this library, we can rewrite the above example as follows:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result

coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

end = time.time()
print('Cost time:', end - start)
Copy the code

This would use both multi-process and asynchronous coroutines, and of course the end result would be much the same as asynchronous:

Cost time: 3.1156570434570312
Copy the code

Due to the reason of my test interface, the fastest response is also 3 seconds, so the extra time is basically IO transmission delay. In the real world, however, we run into a lot of different situations when we do crawls. On the one hand, we use asynchronous coroutines to prevent blocking, and on the other hand, we use multiprocessing to take advantage of multi-core multiplication. The time savings are actually quite significant.

That’s the basic use of coroutines in Python, hopefully helpful.

4. Reference sources

  • http://python.jobbole.com/87310/

  • https://www.cnblogs.com/xybaby/p/6406191.html

  • http://python.jobbole.com/88291/

  • http://lotabout.me/2017/understand-python-asyncio/

  • https://segmentfault.com/a/1190000008814676

  • https://www.cnblogs.com/animalize/p/4738941.html