This is the 8th day of my participation in the August Text Challenge.More challenges in August

1. Implementation of coroutines

1. Event loop

Understanding becomes an endless loop to detect and execute some code.

Pseudocode task list = [Task1, the task2, the task3. ]whileTrue: List of executable tasks, list of completed tasks = Go to the task list to check all tasks, will"Executable"and"Executed"Task returnforReady tasksinReady task list: Execute ready tasksforCompleted tasksinCompleted Task List: Remove completed tasks from the task list If all tasks in the task list are completed, the loop is terminatedCopy the code
importAsyncio # to generate or retrieve an event loop loop = asyncio.get_event_loop() # To put the task in the task list loop.run_until_complete("Task")
Copy the code

2. Get started quickly

Coroutine function, async def function name when defining function.

Coroutine object, obtained by executing the coroutine function ().

async def func():
    pass

result = func()
Copy the code

Note: Executing coroutine functions creates coroutine objects that are not executed by familiar internal code.

If you want to run the internal code of the coroutine function, you must hand the coroutine object to the time loop.

import asyncio

async def func():
    print("Am I being executed?")
    
result = func()

# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)
asyncio.run(result) # python37.
Copy the code

3、 await

Await + await object (coroutine object, Future, Task object ->IO wait)

The sample1:import asyncio


async def func():
    print('start')
    response = await asyncio.sleep(2)
    print('result:{}'.format(response))


asyncio.run(func())
Copy the code

The running results are as follows:

Example 2:

import asyncio


async def other():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return 'result'


async def func():
    print('Execute the code inside the coroutine function'If the current thread is suspended, the event loop can execute another coroutine (task) response =await other()
    print('I/O request ends with :{}'.format(response))


asyncio.run(func())
Copy the code

The running results are as follows:

Example 3:

import asyncio


async def other():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return 'result'


async def func():
    print('Execute the code inside the coroutine function'(response1 = response1, response1 = response1, response1 = response1)await other()
    print('I/O request ends with :{}'.format(response1))

    response2 = await other()
    print('I/O request ends with :{}'.format(response2))


asyncio.run(func())
Copy the code

The running results are as follows:

4. Task object

Tasks are used to schedule coroutines concurrently. Tasks are created with asyncio.create_task, which adds coroutines to an event loop to be scheduled for execution. Instead of using asyncio.create_task(), you can use low-level loop.create_task() or ensure_future() functions. Manually instantiating tasks is not recommended.

Note: asyncio.create_task() is added in Python3.7. Before Python3.7, you could use the low-level asyncio.ensure_future() function instead.

Example 1:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "Return value"


async def main():
    print("Main start"Task1 = asyncio.create_task(func()) Task2 = asyncio.create_task(func()) print()End of the "main"When executing a coroutine and encountering an IO operation, it will automatically switch to perform other tasksawaitIs to wait for the corresponding coroutines to complete execution and obtain the result result1 =await task1
    result2 = await task2

    print(result1, result2)


asyncio.run(main())

Copy the code

The running results are as follows:

Example 2:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "Return value"


async def main():
    print("Main start")

    task_list = [
        asyncio.create_task(func()),
        asyncio.create_task(func())
    ]

    print(End of the "main")

    down, pending = await asyncio.wait(task_list, timeout=None)
    print(down)


asyncio.run(main())

Copy the code

The running results are as follows:

Example 3:

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "Return value"


task_list = [
    func(),
    func(),
]

down, pending = asyncio.run(asyncio.wait(task_list))
print(down)
Copy the code

The running results are as follows:

5. Asyncio.future object

Task inherits the Future and processes the result of the internal await object based on the Future object.

Example 1:

import asyncio


asyncDef main(): # get the current event loop loop = asyncio.get_running_loop() # create a task (Future object) that does nothing. Fut = loop.create_future() # wait for the final result of the task (Future object).await fut


asyncio.run(main())
Copy the code

Example 2:

import asyncio


async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result("666")


asyncDef main(): # get the current event loop loop = asyncio.get_running_loop() # create a task (Future object) that does nothing. Fut = loop.create_future() # create a Task (Task), bind set_after, inside the function, fut will be assigned after 2s. The fuT can be terminated by manually setting the final result of the Future task.awaitLoop.create_task (set_after(fut)) # wait for the final result of the task(Future object). data =await fut
    print(data)


asyncio.run(main())

Copy the code

The running results are as follows:

6, concurrent. Futures. The Future object

Using a thread pool, an object used by a process pool to implement asynchronous operations.

import time

from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor


def func(value):
    time.sleep(1Print (value) # create thread pool = ThreadPoolExecutor(max_workers=5Pool1 = ProcessPoolExecutor(max_workers=5)

for i in range(10):
    fut = pool.submit(func, i)
    print(fut)
Copy the code

The running results are as follows:

There may be crossover times when writing code in the future. For example, 80% of the CRM project is based on coroutine asynchronous programming +MySQL (not supported).

import asyncio
import concurrent
import time

from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process importProcessPoolExecutor def func1():2)
    return "SSS"


async def main():
    loop = asyncio.get_running_loop()
    ' ''1, Run in the default loop'S executor(default ThreadPoolExecutor) Internal back to call the ThreadPoolExecutor submit method first thread pool to apply for a thread to execute func1 function, and returns a concurrent. Futures. The Future object The second step: Will call asyncio. Wrap_future concurrent. Futures. The Future object for asyncio packaging. The Future object. Because of the concurrent. Futures. The Future does not supportawaitSyntax, so it needs to be wrapped as an asyncio.future object to use' '' fut = loop.run_in_executor(None, func1) result = await fut print("default thread pool", Result) # # 2, Run in a custom thread pool # with concurrent. Futures. The ThreadPoolExecutor () as the pool: # result = await loop.run_in_executor(pool, func1) # print("custom thread pool:", The result) # # # 3, Run in a custom process pool # with concurrent. Futures. ProcessPoolExecutor () as the pool: # result = await loop.run_in_executor(pool, func1) # print("custom process pool:", result) asyncio.run(main())Copy the code

The running results are as follows:

Example: Asyncio + modules that do not support asynchrony

import asyncio
import concurrent
import time

from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

import requests


asyncDef down_img(url): def down_img(url):"Download:", url) loop = asyncio.get_event_loop() # requests module does not support asynchronous operations by default, Future = loop.run_in_executor(None, requests. Get, URL) Response =await future

    file_name = url.split('/')[-1]
    with open(file_name, 'wb') as f:
        f.write(response.content)
    print("Download completed")


if __name__ == "__main__":
    url_list = [
        'https://img.tupianzj.com/uploads/allimg/202003/9999/800802cd3b.jpg'.'https://img.tupianzj.com/uploads/allimg/202003/9999/0dfc390267.jpg'.'https://img.tupianzj.com/uploads/allimg/160206/9-1602061H626.jpg'.'https://img.tupianzj.com/uploads/allimg/202003/9999/064a2dff13.jpg'.'https://img.tupianzj.com/uploads/allimg/160125/9-160125101553.jpg'
    ]

    task_list = [down_img(url) for url in url_list]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(task_list))
Copy the code

The running results are as follows:

7. Asynchronous iterators

What is an asynchronous iterator

An object that implements __aiter__() and __anext__() methods. Anext () must return an Awaitable object. Async_for processes the waitable object returned by an asynchronous iterator’s __anext__() method until it raises a StopAsyncIteration exception. Introduced by PEP 492.

What is an asynchronous iterable?

An object that can be used in async_for statements. An asynchronous iterator must be returned via its __aiter__() method. Introduced by PEP 492.

import asyncio


class Reader(object): "custom asynchronous iterators (also asynchronous iterables)"def __init__(self) :self.count = 0

    async def read_line(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.read_line()
        if val == None:
            raise StopAsyncIteration
        return val


async def func():
    obj = Reader()
    async for item in obj:
        print(item)


asyncio.run(func())
Copy the code

The running results are as follows:

Asynchronous context manager

This object controls the environment in async_with statements by defining __aiter__() and __anext__() methods.

import asyncio


class AsyncContextManger:

    def __init__(self.conn) :self.conn = conn

    asyncDef do_something(self)return Awesome!

    asyncDef __aenter__(self): self.conn = asyncio.sleep()1)
        return self

    asyncDef __aexit__(self, exc_type, exc_val, exc_tb)await asyncio.sleep(1)


async def func():
    async with AsyncContextManger("conn") as f:
        result = await f.do_something()
        print(result)


asyncio.run(func())
Copy the code

The running results are as follows: