Author: MING Personal public account: Python programming time personal wechat: MrBensonwon
Note: This series has been updated on wechat official account. To view the latest articles, please pay attention to the public account for access.
Hello, concurrent programming is chapter 10.
Let’s review what we did in the last video to better tie this up.
In the last video, we first showed you how to create a coroutine object. There are two main ways
- through
async
Key words, - through
@asyncio.coroutine
Decorator function.
Then with coroutine objects, we need an event loop container to run our coroutine. The main steps are as follows:
- Convert coroutine objects to task task objects
- Define an event loop object container to hold tasks
- Throws the Task task into an event loop object and fires
To give you a better idea of what generators and coroutines look like, I also introduce the difference between yield and async/await.
Finally, we looked at how to add a callback function to a coroutine.
All right, just to use an analogy, in the last video, we were just talking about single tasks in coroutines. Ha ha, isn’t that hard? I want you to watch it a few times and code it more than just watch it.
So in this video, we’re going to look at multitasking in coroutines.
Concurrency in coroutines
Concurrency of coroutines, just like threads. For example, it is like a person eating three steamed buns at the same time. If he bites into the first one, he has to wait for the mouthful to go down before he can eat the other two. And so they alternate.
Asyncio achieves concurrency by requiring multiple coroutines to complete the task, await it whenever a task is blocked, and then the other coroutines continue to work.
The first step, of course, is to create a list of coroutines.
# coroutine function
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# coroutine object
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
Convert coroutines into tasks and form lists
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
Copy the code
Step two, how to register these coroutines into the event loop.
There are two methods, and I’ll talk about the differences later.
- use
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
Copy the code
- use
asyncio.gather()
Do not omit the * here
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
Copy the code
Finally, the result of the return can be viewed with task.result().
for task in tasks:
print('Task ret: ', task.result())
Copy the code
The complete code is as follows
import asyncio
# coroutine function
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# coroutine object
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
Convert coroutines into tasks and form lists
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
Copy the code
The output
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Copy the code
Nesting in coroutines
We can use async to define coroutines, which are used for time-consuming IO operations. We can also encapsulate more IO operations, thus implementing nested coroutines, that is, one coroutine is await another coroutine, and thus connected.
Let’s look at an example.
import asyncio
# for internal coroutine functions
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
# External coroutine functions
async def main(a):
Create three coroutine objects
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
Convert coroutines to tasks and form lists
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
# 【 原 文 】 : Await a task list
# dones: indicates completed tasks
# pendings: Indicates unfinished tasks
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Copy the code
So here, if I’m using asyncio.Gather (), this is how it works
# notice that this return result is not the same as await
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
Copy the code
The output is the same.
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Copy the code
On closer inspection, you can see that this example is entirely adapted from the “concurrency in coroutines” example above. The result is exactly the same. Create a coroutine object, transform a task, and encapsulate it in a coroutine function. An external coroutine, nested with an internal coroutine.
In fact, if you look at the asyncio.await() source code, you will find the following
loop.run_until_complete(asyncio.wait(tasks))
Copy the code
Seemingly not nested, but actually internally nested.
Here also the source, posted, interested can look, not interested, can directly skip.
# Internal coroutine functions
async def _wait(fs, timeout, return_when, loop):
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)) :
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()
done, pending = set(), set()
for f in fs:
f.remove_done_callback(_on_completion)
if f.done():
done.add(f)
else:
pending.add(f)
return done, pending
# External coroutine functions
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
loop = events.get_event_loop()
fs = {ensure_future(f, loop=loop) for f in set(fs)}
# 【 原 文 】 : Await an internal coroutine
return await _wait(fs, timeout, return_when, loop)
Copy the code
State in a coroutine
Remember when we talked about generators, we talked about the state of generators. Also, in coroutines, let’s look at the states of coroutines (Future objects, or tasks, to be more precise).
Pending: The future is being created but not executed Running: the event loop is calling tasks. Done: The Task has been Cancelled
Python3 xx.py can execute this code manually,
import asyncio
import threading
import time
async def hello(a):
print("Running in the loop...")
flag = 0
while flag < 1000:
with open("F:\\test.txt"."a") as f:
f.write("-- -- -- -- -- -")
flag += 1
print("Stop the loop")
if __name__ == '__main__':
coroutine = hello()
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# Pending: Indicates the Pending state
print(task)
try:
t1 = threading.Thread(target=loop.run_until_complete, args=(task,))
# t1.daemon = True
t1.start()
# Running: Running status
time.sleep(1)
print(task)
t1.join()
except KeyboardInterrupt as e:
# Cancel task
task.cancel()
# Cacelled: The task is cancelled
print(task)
finally:
print(task)
Copy the code
Pending -> Pending: Runing -> Finished state changes will be printed
If you press Ctrl+C immediately after execution, the task cancels and the Pending -> Cancelling -> Cancelling state changes are printed.
Gather and wait
Remember I said there are two ways to register multiple coroutines into an event loop?
- use
asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
Copy the code
- use
asyncio.gather()
Do not omit the * here
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
Copy the code
Asyncio. gather and Asyncio. wait are widely used in asyncio, so it is necessary to study them carefully.
So let’s do the example as usual, let’s define a coroutine function
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1) :
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
Copy the code
. Receiving parameter mode
asyncio.wait
The received tasks must be a list object that contains multiple tasks.
It can be made into a task using asyncio.ensure_future
tasks=[
asyncio.ensure_future(factorial("A".2)),
asyncio.ensure_future(factorial("B".3)),
asyncio.ensure_future(factorial("C".4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
Copy the code
You can also do this without turning to a Task.
loop = asyncio.get_event_loop()
tasks=[
factorial("A".2),
factorial("B".3),
factorial("C".4)
]
loop.run_until_complete(asyncio.wait(tasks))
Copy the code
asyncio.gather
It can accept a list object, but the * cannot be omitted
tasks=[
asyncio.ensure_future(factorial("A".2)),
asyncio.ensure_future(factorial("B".3)),
asyncio.ensure_future(factorial("C".4))
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
Copy the code
It can also work like the * above, because the first argument to asyncio.Gather () is * corOS_OR_futures, which is called a list of variables of variable length that are not named.
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A".2),
factorial("B".3),
factorial("C".4),
))
Copy the code
You could even do that
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1.3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1.5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1.7)])
loop.run_until_complete(asyncio.gather(group1, group2, group3))
Copy the code
. Returns different results
asyncio.wait
Asyncio. wait returns dones and Pendings
dones
: Indicates a completed taskpendings
: indicates an unfinished task
If we need to fetch, run the results, we need to collect and fetch manually.
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())
Copy the code
asyncio.gather
Asyncio. Gather it returns values directly to us, no need to manually collect them.
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)
Copy the code
. Wait has control functions
import asyncio
import random
async def coro(tag):
await asyncio.sleep(random.uniform(0.5.5))
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1.11)]
# [control the number of tasks run] : Returns on the first task run
# FIRST_COMPLETED: The first task returns completely
# FIRST_EXCEPTION: returns the first exception
# ALL_COMPLETED: Return all tasks completed (default)
dones, pendings = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
print("Number of first completed tasks :", len(dones))
# [control time] : After running for one second, return
dones2, pendings2 = loop.run_until_complete(
asyncio.wait(pendings, timeout=1))
print("Number of tasks completed second time :", len(dones2))
# [default] : Return after all tasks are complete
dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))
print("Number of missions completed third time :", len(dones3))
loop.close()
Copy the code
The output
Number of first completed tasks:1
Number of tasks completed the second time:4
Number of missions completed in the third time:5
Copy the code