This is the 8th day of my participation in the August Text Challenge.More challenges in August
1. Implementation of coroutines
1. Event loop
Understanding becomes an endless loop to detect and execute some code.
Pseudocode task list = [Task1, the task2, the task3. ]whileTrue: List of executable tasks, list of completed tasks = Go to the task list to check all tasks, will"Executable"and"Executed"Task returnforReady tasksinReady task list: Execute ready tasksforCompleted tasksinCompleted Task List: Remove completed tasks from the task list If all tasks in the task list are completed, the loop is terminatedCopy the code
importAsyncio # to generate or retrieve an event loop loop = asyncio.get_event_loop() # To put the task in the task list loop.run_until_complete("Task")
Copy the code
2. Get started quickly
Coroutine function, async def function name when defining function.
Coroutine object, obtained by executing the coroutine function ().
async def func():
pass
result = func()
Copy the code
Note: Executing coroutine functions creates coroutine objects that are not executed by familiar internal code.
If you want to run the internal code of the coroutine function, you must hand the coroutine object to the time loop.
import asyncio
async def func():
print("Am I being executed?")
result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)
asyncio.run(result) # python37.
Copy the code
3、 await
Await + await object (coroutine object, Future, Task object ->IO wait)
The sample1:import asyncio
async def func():
print('start')
response = await asyncio.sleep(2)
print('result:{}'.format(response))
asyncio.run(func())
Copy the code
The running results are as follows:
Example 2:
import asyncio
async def other():
print('start')
await asyncio.sleep(2)
print('end')
return 'result'
async def func():
print('Execute the code inside the coroutine function'If the current thread is suspended, the event loop can execute another coroutine (task) response =await other()
print('I/O request ends with :{}'.format(response))
asyncio.run(func())
Copy the code
The running results are as follows:
Example 3:
import asyncio
async def other():
print('start')
await asyncio.sleep(2)
print('end')
return 'result'
async def func():
print('Execute the code inside the coroutine function'(response1 = response1, response1 = response1, response1 = response1)await other()
print('I/O request ends with :{}'.format(response1))
response2 = await other()
print('I/O request ends with :{}'.format(response2))
asyncio.run(func())
Copy the code
The running results are as follows:
4. Task object
Tasks are used to schedule coroutines concurrently. Tasks are created with asyncio.create_task, which adds coroutines to an event loop to be scheduled for execution. Instead of using asyncio.create_task(), you can use low-level loop.create_task() or ensure_future() functions. Manually instantiating tasks is not recommended.
Note: asyncio.create_task() is added in Python3.7. Before Python3.7, you could use the low-level asyncio.ensure_future() function instead.
Example 1:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "Return value"
async def main():
print("Main start"Task1 = asyncio.create_task(func()) Task2 = asyncio.create_task(func()) print()End of the "main"When executing a coroutine and encountering an IO operation, it will automatically switch to perform other tasksawaitIs to wait for the corresponding coroutines to complete execution and obtain the result result1 =await task1
result2 = await task2
print(result1, result2)
asyncio.run(main())
Copy the code
The running results are as follows:
Example 2:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "Return value"
async def main():
print("Main start")
task_list = [
asyncio.create_task(func()),
asyncio.create_task(func())
]
print(End of the "main")
down, pending = await asyncio.wait(task_list, timeout=None)
print(down)
asyncio.run(main())
Copy the code
The running results are as follows:
Example 3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "Return value"
task_list = [
func(),
func(),
]
down, pending = asyncio.run(asyncio.wait(task_list))
print(down)
Copy the code
The running results are as follows:
5. Asyncio.future object
Task inherits the Future and processes the result of the internal await object based on the Future object.
Example 1:
import asyncio
asyncDef main(): # get the current event loop loop = asyncio.get_running_loop() # create a task (Future object) that does nothing. Fut = loop.create_future() # wait for the final result of the task (Future object).await fut
asyncio.run(main())
Copy the code
Example 2:
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")
asyncDef main(): # get the current event loop loop = asyncio.get_running_loop() # create a task (Future object) that does nothing. Fut = loop.create_future() # create a Task (Task), bind set_after, inside the function, fut will be assigned after 2s. The fuT can be terminated by manually setting the final result of the Future task.awaitLoop.create_task (set_after(fut)) # wait for the final result of the task(Future object). data =await fut
print(data)
asyncio.run(main())
Copy the code
The running results are as follows:
6, concurrent. Futures. The Future object
Using a thread pool, an object used by a process pool to implement asynchronous operations.
import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1Print (value) # create thread pool = ThreadPoolExecutor(max_workers=5Pool1 = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
Copy the code
The running results are as follows:
There may be crossover times when writing code in the future. For example, 80% of the CRM project is based on coroutine asynchronous programming +MySQL (not supported).
import asyncio
import concurrent
import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process importProcessPoolExecutor def func1():2)
return "SSS"
async def main():
loop = asyncio.get_running_loop()
' ''1, Run in the default loop'S executor(default ThreadPoolExecutor) Internal back to call the ThreadPoolExecutor submit method first thread pool to apply for a thread to execute func1 function, and returns a concurrent. Futures. The Future object The second step: Will call asyncio. Wrap_future concurrent. Futures. The Future object for asyncio packaging. The Future object. Because of the concurrent. Futures. The Future does not supportawaitSyntax, so it needs to be wrapped as an asyncio.future object to use' '' fut = loop.run_in_executor(None, func1) result = await fut print("default thread pool", Result) # # 2, Run in a custom thread pool # with concurrent. Futures. The ThreadPoolExecutor () as the pool: # result = await loop.run_in_executor(pool, func1) # print("custom thread pool:", The result) # # # 3, Run in a custom process pool # with concurrent. Futures. ProcessPoolExecutor () as the pool: # result = await loop.run_in_executor(pool, func1) # print("custom process pool:", result) asyncio.run(main())Copy the code
The running results are as follows:
Example: Asyncio + modules that do not support asynchrony
import asyncio
import concurrent
import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
import requests
asyncDef down_img(url): def down_img(url):"Download:", url) loop = asyncio.get_event_loop() # requests module does not support asynchronous operations by default, Future = loop.run_in_executor(None, requests. Get, URL) Response =await future
file_name = url.split('/')[-1]
with open(file_name, 'wb') as f:
f.write(response.content)
print("Download completed")
if __name__ == "__main__":
url_list = [
'https://img.tupianzj.com/uploads/allimg/202003/9999/800802cd3b.jpg'.'https://img.tupianzj.com/uploads/allimg/202003/9999/0dfc390267.jpg'.'https://img.tupianzj.com/uploads/allimg/160206/9-1602061H626.jpg'.'https://img.tupianzj.com/uploads/allimg/202003/9999/064a2dff13.jpg'.'https://img.tupianzj.com/uploads/allimg/160125/9-160125101553.jpg'
]
task_list = [down_img(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
Copy the code
The running results are as follows:
7. Asynchronous iterators
What is an asynchronous iterator
An object that implements __aiter__() and __anext__() methods. Anext () must return an Awaitable object. Async_for processes the waitable object returned by an asynchronous iterator’s __anext__() method until it raises a StopAsyncIteration exception. Introduced by PEP 492.
What is an asynchronous iterable?
An object that can be used in async_for statements. An asynchronous iterator must be returned via its __aiter__() method. Introduced by PEP 492.
import asyncio
class Reader(object): "custom asynchronous iterators (also asynchronous iterables)"def __init__(self) :self.count = 0
async def read_line(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.read_line()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run(func())
Copy the code
The running results are as follows:
Asynchronous context manager
This object controls the environment in async_with statements by defining __aiter__() and __anext__() methods.
import asyncio
class AsyncContextManger:
def __init__(self.conn) :self.conn = conn
asyncDef do_something(self)return Awesome!
asyncDef __aenter__(self): self.conn = asyncio.sleep()1)
return self
asyncDef __aexit__(self, exc_type, exc_val, exc_tb)await asyncio.sleep(1)
async def func():
async with AsyncContextManger("conn") as f:
result = await f.do_something()
print(result)
asyncio.run(func())
Copy the code
The running results are as follows: