Author: MING Personal public account: Python programming time personal wechat: MrBensonwon

Note: This series has been updated on wechat official account. To view the latest articles, please pay attention to the public account for access.

Hello, concurrent programming is chapter 10.

Let’s review what we did in the last video to better tie this up.

In the last video, we first showed you how to create a coroutine object. There are two main ways

  • throughasyncKey words,
  • through@asyncio.coroutineDecorator function.

Then with coroutine objects, we need an event loop container to run our coroutine. The main steps are as follows:

  • Convert coroutine objects to task task objects
  • Define an event loop object container to hold tasks
  • Throws the Task task into an event loop object and fires

To give you a better idea of what generators and coroutines look like, I also introduce the difference between yield and async/await.

Finally, we looked at how to add a callback function to a coroutine.

All right, just to use an analogy, in the last video, we were just talking about single tasks in coroutines. Ha ha, isn’t that hard? I want you to watch it a few times and code it more than just watch it.

So in this video, we’re going to look at multitasking in coroutines.

Concurrency in coroutines

Concurrency of coroutines, just like threads. For example, it is like a person eating three steamed buns at the same time. If he bites into the first one, he has to wait for the mouthful to go down before he can eat the other two. And so they alternate.

Asyncio achieves concurrency by requiring multiple coroutines to complete the task, await it whenever a task is blocked, and then the other coroutines continue to work.

The first step, of course, is to create a list of coroutines.

# coroutine function

async def do_some_work(x):

    print('Waiting: ', x)

    await asyncio.sleep(x)

    return 'Done after {}s'.format(x)



# coroutine object

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)



Convert coroutines into tasks and form lists

tasks = [

    asyncio.ensure_future(coroutine1),

    asyncio.ensure_future(coroutine2),

    asyncio.ensure_future(coroutine3)

]

Copy the code

Step two, how to register these coroutines into the event loop.

There are two methods, and I’ll talk about the differences later.

  • useasyncio.wait()
loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

Copy the code
  • useasyncio.gather()
Do not omit the * here

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

Copy the code

Finally, the result of the return can be viewed with task.result().

for task in tasks:

    print('Task ret: ', task.result())

Copy the code

The complete code is as follows

import asyncio



# coroutine function

async def do_some_work(x):

    print('Waiting: ', x)

    await asyncio.sleep(x)

    return 'Done after {}s'.format(x)



# coroutine object

coroutine1 = do_some_work(1)

coroutine2 = do_some_work(2)

coroutine3 = do_some_work(4)



Convert coroutines into tasks and form lists

tasks = [

    asyncio.ensure_future(coroutine1),

    asyncio.ensure_future(coroutine2),

    asyncio.ensure_future(coroutine3)

]



loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))



for task in tasks:

    print('Task ret: ', task.result())

Copy the code

The output

Waiting:  1

Waiting:  2

Waiting:  4

Task ret:  Done after 1s

Task ret:  Done after 2s

Task ret:  Done after 4s

Copy the code

Nesting in coroutines

We can use async to define coroutines, which are used for time-consuming IO operations. We can also encapsulate more IO operations, thus implementing nested coroutines, that is, one coroutine is await another coroutine, and thus connected.

Let’s look at an example.

import asyncio



# for internal coroutine functions

async def do_some_work(x):

    print('Waiting: ', x)

    await asyncio.sleep(x)

    return 'Done after {}s'.format(x)



# External coroutine functions

async def main(a):

    Create three coroutine objects

    coroutine1 = do_some_work(1)

    coroutine2 = do_some_work(2)

    coroutine3 = do_some_work(4)



    Convert coroutines to tasks and form lists

    tasks = [

        asyncio.ensure_future(coroutine1),

        asyncio.ensure_future(coroutine2),

        asyncio.ensure_future(coroutine3)

    ]



    # 【 原 文 】 : Await a task list

    # dones: indicates completed tasks

    # pendings: Indicates unfinished tasks

    dones, pendings = await asyncio.wait(tasks)



    for task in dones:

        print('Task ret: ', task.result())



loop = asyncio.get_event_loop()

loop.run_until_complete(main())

Copy the code

So here, if I’m using asyncio.Gather (), this is how it works

# notice that this return result is not the same as await



results = await asyncio.gather(*tasks)

for result in results:

    print('Task ret: ', result)

Copy the code

The output is the same.

Waiting: 1

Waiting: 2

Waiting: 4

Task ret:  Done after 1s

Task ret:  Done after 2s

Task ret:  Done after 4s

Copy the code

On closer inspection, you can see that this example is entirely adapted from the “concurrency in coroutines” example above. The result is exactly the same. Create a coroutine object, transform a task, and encapsulate it in a coroutine function. An external coroutine, nested with an internal coroutine.

In fact, if you look at the asyncio.await() source code, you will find the following

loop.run_until_complete(asyncio.wait(tasks))

Copy the code

Seemingly not nested, but actually internally nested.

Here also the source, posted, interested can look, not interested, can directly skip.

# Internal coroutine functions

async def _wait(fs, timeout, return_when, loop):

    assert fs, 'Set of Futures is empty.'

    waiter = loop.create_future()

    timeout_handle = None

    if timeout is not None:

        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)

    counter = len(fs)



    def _on_completion(f):

        nonlocal counter

        counter -= 1

        if (counter <= 0 or

            return_when == FIRST_COMPLETED or

            return_when == FIRST_EXCEPTION and (not f.cancelled() and

                                                f.exception() is not None)) :

            if timeout_handle is not None:

                timeout_handle.cancel()

            if not waiter.done():

                waiter.set_result(None)



    for f in fs:

        f.add_done_callback(_on_completion)



    try:

        await waiter

    finally:

        if timeout_handle is not None:

            timeout_handle.cancel()



    done, pending = set(), set()

    for f in fs:

        f.remove_done_callback(_on_completion)

        if f.done():

            done.add(f)

        else:

            pending.add(f)

    return done, pending



# External coroutine functions

async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):

    if futures.isfuture(fs) or coroutines.iscoroutine(fs):

        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")

    if not fs:

        raise ValueError('Set of coroutines/Futures is empty.')

    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):

        raise ValueError(f'Invalid return_when value: {return_when}')



    if loop is None:

        loop = events.get_event_loop()



    fs = {ensure_future(f, loop=loop) for f in set(fs)}

    # 【 原 文 】 : Await an internal coroutine

    return await _wait(fs, timeout, return_when, loop)

Copy the code

State in a coroutine

Remember when we talked about generators, we talked about the state of generators. Also, in coroutines, let’s look at the states of coroutines (Future objects, or tasks, to be more precise).

Pending: The future is being created but not executed Running: the event loop is calling tasks. Done: The Task has been Cancelled

Python3 xx.py can execute this code manually,

import asyncio

import threading

import time



async def hello(a):

    print("Running in the loop...")

    flag = 0

    while flag < 1000:

        with open("F:\\test.txt"."a"as f:

            f.write("-- -- -- -- -- -")

        flag += 1

    print("Stop the loop")



if __name__ == '__main__':

    coroutine = hello()

    loop = asyncio.get_event_loop()

    task = loop.create_task(coroutine)



    # Pending: Indicates the Pending state

    print(task)

    try:

        t1 = threading.Thread(target=loop.run_until_complete, args=(task,))

        # t1.daemon = True

        t1.start()



        # Running: Running status

        time.sleep(1)

        print(task)

        t1.join()

    except KeyboardInterrupt as e:

        # Cancel task

        task.cancel()

        # Cacelled: The task is cancelled

        print(task)

    finally:

        print(task)

Copy the code

Pending -> Pending: Runing -> Finished state changes will be printed

If you press Ctrl+C immediately after execution, the task cancels and the Pending -> Cancelling -> Cancelling state changes are printed.

Gather and wait

Remember I said there are two ways to register multiple coroutines into an event loop?

  • useasyncio.wait()
loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

Copy the code
  • useasyncio.gather()
Do not omit the * here

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

Copy the code

Asyncio. gather and Asyncio. wait are widely used in asyncio, so it is necessary to study them carefully.

So let’s do the example as usual, let’s define a coroutine function

import asyncio



async def factorial(name, number):

    f = 1

    for i in range(2, number+1) :

        print("Task %s: Compute factorial(%s)..." % (name, i))

        await asyncio.sleep(1)

        f *= i

    print("Task %s: factorial(%s) = %s" % (name, number, f))

Copy the code

. Receiving parameter mode

asyncio.wait

The received tasks must be a list object that contains multiple tasks.

It can be made into a task using asyncio.ensure_future

tasks=[

       asyncio.ensure_future(factorial("A".2)),

       asyncio.ensure_future(factorial("B".3)),

       asyncio.ensure_future(factorial("C".4))

]



loop = asyncio.get_event_loop()



loop.run_until_complete(asyncio.wait(tasks))

Copy the code

You can also do this without turning to a Task.

loop = asyncio.get_event_loop()



tasks=[

       factorial("A".2),

       factorial("B".3),

       factorial("C".4)

]



loop.run_until_complete(asyncio.wait(tasks))

Copy the code

asyncio.gather

It can accept a list object, but the * cannot be omitted

tasks=[

       asyncio.ensure_future(factorial("A".2)),

       asyncio.ensure_future(factorial("B".3)),

       asyncio.ensure_future(factorial("C".4))

]



loop = asyncio.get_event_loop()



loop.run_until_complete(asyncio.gather(*tasks))

Copy the code

It can also work like the * above, because the first argument to asyncio.Gather () is * corOS_OR_futures, which is called a list of variables of variable length that are not named.

loop = asyncio.get_event_loop()



loop.run_until_complete(asyncio.gather(

    factorial("A".2),

    factorial("B".3),

    factorial("C".4),

))

Copy the code

You could even do that

loop = asyncio.get_event_loop()



group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1.3)])

group2 = asyncio.gather(*[factorial("B", i) for i in range(1.5)])

group3 = asyncio.gather(*[factorial("B", i) for i in range(1.7)])



loop.run_until_complete(asyncio.gather(group1, group2, group3))

Copy the code

. Returns different results

asyncio.wait

Asyncio. wait returns dones and Pendings

  • dones: Indicates a completed task
  • pendings: indicates an unfinished task

If we need to fetch, run the results, we need to collect and fetch manually.

dones, pendings = await asyncio.wait(tasks)



for task in dones:

    print('Task ret: ', task.result())

Copy the code

asyncio.gather

Asyncio. Gather it returns values directly to us, no need to manually collect them.

results = await asyncio.gather(*tasks)



for result in results:

    print('Task ret: ', result)

Copy the code

. Wait has control functions

import asyncio

import random





async def coro(tag):

    await asyncio.sleep(random.uniform(0.5.5))



loop = asyncio.get_event_loop()



tasks = [coro(i) for i in range(1.11)]





# [control the number of tasks run] : Returns on the first task run

# FIRST_COMPLETED: The first task returns completely

# FIRST_EXCEPTION: returns the first exception

# ALL_COMPLETED: Return all tasks completed (default)

dones, pendings = loop.run_until_complete(

    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

print("Number of first completed tasks :", len(dones))





# [control time] : After running for one second, return

dones2, pendings2 = loop.run_until_complete(

    asyncio.wait(pendings, timeout=1))

print("Number of tasks completed second time :", len(dones2))





# [default] : Return after all tasks are complete

dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))



print("Number of missions completed third time :", len(dones3))



loop.close()

Copy the code

The output

Number of first completed tasks:1

Number of tasks completed the second time:4

Number of missions completed in the third time:5

Copy the code