\

\

Python learning and development

The Asyncio module provides tools for building concurrent applications using coroutines. It uses a single-thread, single-process approach to concurrency, in which the parts of the application cooperate with each other to display switching tasks, typically context switching occurs when the application blocks I/O operations such as waiting to read or write files, or requesting networks. Asyncio also supports scheduling code to run at a specific event in the future, allowing one coroutine to wait for another to complete in order to process system signals and recognize other events.

The concept of asynchronous concurrency

Most of the other concurrency models are written in a linear fashion. And it depends on the language runtime system or the underlying thread or process of the operating system to change the context appropriately, whereas asyncio-based applications require application code display to handle context switching. Asyncio provides a framework centered around an event loop, in which a program starts an infinite loop and registers some functions. When a satisfying event occurs, the corresponding coroutine function is called.

Event loop

Event loops are an efficient way to handle multiple concurrent events. Described in Wikipedia as “A programming architecture that waits for A program to allocate events or messages,” we can define event loops to simplify the use of polling methods to monitor events, collocially saying “perform B when A occurs.” Event loops make use of Poller objects, freeing programmers from control over the addition and removal of tasks and the control of events. Event loops use callback methods to know when an event has occurred. It is a “central processing device” provided by Asyncio and supports the following operations:

  • Register, execute, and cancel deferred calls (timeout)
  • Create Transports servers and clients that can be used for many types of communication
  • Transports to start processes and related and external communication programs
  • Delegate time-consuming function calls to a thread pool
  • The single-threaded (process) architecture also avoids the problem of multiple threads (process) modifying mutable state locks.

Applications that interact with an event loop explicitly register the code that will run, allowing the event loop to make the necessary calls to the application code when resources are available. For example, if there is no more data to read from a socket, the server cedes control to the event loop.

Future

A future is a data structure that represents the result of work that has not yet been completed. An event loop can monitor whether a Future object completes. This allows one part of the application to wait for another part to complete some work.

Task

Task is a subclass of Future that knows how to wrap and manage the execution of a coroutine. When the resources required by the task are available, the event loop schedules the task to allow and produces a result that can be consumed by other coroutines.

Asynchronous methods

Using Asyncio means that you need to write asynchronous methods all the time. A standard method looks like this:

def regular_double(x) :
    return 2 * x
Copy the code

And an asynchronous method:

async def async_double(x):
    return 2 * x
Copy the code

Asynchronous methods look the same as standard methods except async in front of them. Async is short for asynchronous. Standard functions are called synchronous functions to distinguish them from asynchronous functions. From the user’s point of view, asynchronous functions and synchronous functions have the following differences:

To call an asynchronous function, you must use the await keyword. Therefore, instead of writing regular_double(3), write await async_double(3). You cannot use await in synchronous functions or you will get an error. Syntax error:

def print_double(x):
    print(await async_double(x))   # <-- SyntaxError here
Copy the code

But in asynchronous functions, await is allowed:

async def print_double(x):
    print(await async_double(x))   # <-- OK!
Copy the code

coroutines

Start a coroutine

Common asynchronous methods are called coroutines. Asyncio event loops can start a coroutine in a number of different ways. For entry functions, the simplest way to do this is to use run_until_complete() and pass the coroutine directly to the method.

Import asyncio async def foo(): print(" this is a coroutine ") if __name__ == '__main__': loop = asyncio.get_event_loop() try: Print (" start running the coroutine ") coro = foo() print(" start running the coroutine ") loop.run_until_complete(coro) finally: print(" Start running the coroutine ") loop.close()Copy the code

The output

You start running coroutines and you go into an event loop which is a coroutine closing event loopCopy the code

This is the simplest example of a coroutine, so let’s look at the code above. The first step is to get an application of the event loop, which is the object loop defined. You can either use the default event loop or instantiate a specific loop class (such as uvloop), where the default loop run_until_complete(CORO) method is used to start the loop and stop the loop when the coroutine returns. The argument to run_until_complete is a futrue object. When a coroutine is passed in, it is automatically encapsulated internally as task, which is a subclass of Future. More on Task and future later.

Returns a value from a coroutine

Rewrite the above code into the following code

Import asyncio async def foo(): print(" this is a coroutine ") return "if __name__ == '__main__': loop = asyncio.get_event_loop() try: Print (" start running coroutine ") coro = foo() print(" enter event loop ") result = loop.run_until_complete(coro) Print (f"run_until_complete: {result}) finally: print(" close event loop ") loop.close()Copy the code

Run_until_complete gets the return value of the coroutine, or if no return value is given, like a function, None is returned by default.

Coroutines call coroutines

One coroutine can start another coroutine so that tasks can be encapsulated into different coroutines depending on what they are doing. We can use the await keyword in coroutines, chained scheduling coroutines, to form a coroutine task flow. As in the following example.

import asyncio async def main(): Print (" main coroutine ") print(" waiting for result1 coroutine to run ") res1 = await result1() print(" waiting for result2 coroutine to run ") res2 = await result2(res1) return (res1,res2) async def result1(): print(" this is a result1 coroutine ") return "result1" async def result2(arg): Print (" this is a result2 coroutine ") return f"result2 receives a parameter,{arg}" if __name__ == '__main__': loop = asyncio.get_event_loop() try: Result = loop.run_until_complete(main()) print(f" get return value :{result}") finally: print(" close event loop ") loop.close()Copy the code

The output

The main coroutine waits for the result1 coroutine to run and this is the result2 coroutine waiting for the result2 coroutine to run and this is the result2 coroutine getting the return value :('result1', 'result2 received a parameter,result1') closes the event loopCopy the code

Call ordinary functions in coroutines

There are methods to call ordinary functions in coroutines. The available keywords are call_soon,call_later, and call_AT.

call_soon

The call can be interpreted literally to return immediately.

loop.call_soon(callback, *args, context=None)
Copy the code

The callback function is called immediately in the next iteration of the time loop. Most callback functions support positional arguments rather than keyword arguments. If you want to use keyword arguments, it is recommended to further wrap the method with functools.aprtial(). The optional context keyword allows you to specify a custom contextvars.context for the callback to run. Use the current context when no context is provided. In Python 3.7, context support was added to asyncio coroutines. Context allows variables to be passed implicitly in some scenarios, such as database connection sessions, without having to be passed explicitly across all method calls. Let’s look at a specific usage example.

import asyncio import functools def callback(args, *, kwargs="defalut"): {args},{kwargs}") async def main(loop): Print (" register callback") loop.call_soon(callback, 1) Wrapped = functools.partial(callback, Kwargs ="not defalut") loop.call_soon(wrapped, 2) await asyncio.sleep(0.2) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()Copy the code

The output

Callback (1); defalut (2); not defalut (2)Copy the code

From the output we can see that we successfully called a normal function in the coroutine, printing 1 and 2 in order.

Sometimes we don’t want to call a function immediately, so we can delay call_later to call a function.

call_later

loop.call_later(delay, callback, *args, context=None)
Copy the code

How long does the event loop delay before executing the callback function? Let’s look at a small example with call_soon above

import asyncio def callback(n): print(f"callback {n} invoked") async def main(loop): Call_later (0.2, callback, 1) call_later(0.1, callback, 2) loop.call_soon(callback, 2) 3) await asyncio.sleep(0.4) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()Copy the code

The output

Register the Callbacks callback 3 Invoked callback 2 Invoked callback 1 invokedCopy the code

The following output is displayed: 1. Call_soon is executed before call_later, regardless of its location. 2.

call_at

loop.call_at(when, callback, *args, context=None)
Copy the code

The first parameter of call_at represents a monotone time, which is a little different from the system time we usually say. Here the time refers to the internal time of the event loop, which can be obtained by loop.time(), and then can be operated on. The latter arguments are the same as the previous two methods. In fact, inside call_later is the call_AT being called.

Import asyncio def call_back(n, loop): print(f"callback {n} "{loop.time()}") async def main(loop): Time () print(" callback", now) print(" callback", now) print(" callback") loop.call_at(now + 0.1, call_back, 1) Loop) loop.call_at(now + 0.2, call_back, 2, loop) loop.call_soon(call_back, 3, loop) await asyncio.sleep(1) if __name__ == '__main__': loop = asyncio.get_event_loop() try: Print (" enter event loop ") loop.run_until_complete(main(loop)) finally: print(" close loop ") loop.close()Copy the code

The output

4412.152849525 Cycle time 4412.152849525 Register callback callback 3 Run time 4412.152942526 callback 1 Callback 2 Run time 4412.354262512 Close the loopCopy the code

Because call_later is implemented internally through call_AT, I won’t go into that here.

Future

Get the result in Futrue

Future represents the result of work that has not yet been completed. An event loop can monitor the state of a Future object to indicate that it has completed. The Future object has several states:

  • Pending
  • Running
  • Done
  • Cancelled the task is “pending” when the future is created, while the event loop is running and “done” when the event loop is called. If the event loop needs to be stopped, the task should be Cancelled with its status as “Cancel”.
import asyncio def foo(future, result): Print (f" future :{future}") print(f" future :{result}") future.set_result(result) print(f" future :{future}") if __name__ == '__main__': loop = asyncio.get_event_loop() try: all_done = asyncio.Future() loop.call_soon(foo, all_done, "Future is done!" Run_until_complete (all_done) print(" return result ", result) finally: Print (" close event loop ") loop.close() print(" Get future result ", all_done.result())Copy the code

The output

<Future Pending cb=[_run_until_complete_cb() at / Library/Frameworks/Python framework Versions / 3.6 / lib/python3.6 / asyncio/base_events py: 176] > set the result of the future: the future is done!  <Future finished result='Future is done! Future is done! Future is done!Copy the code

After calling set_result, the state of the future object changes from pending to FINISHED. The all_done instance of the future retains the result provided to the method and can be used later.

Future objects use await

A future, like a coroutine, can get its results using the await keyword.

Import asyncio def foo(future, result): print(" set result to future", result) future.set_result(result) async def main(loop): All_done = asyncio.future () print(" Call function to get Future object ") loop.call_soon(foo, all_done, "The result") result = await all_done print(" get result in future ", result) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()Copy the code

The Future callback

When a Future is finished, it can execute some callbacks, which are called in the order in which they were registered:

import asyncio import functools def callback(future, n): print('{}: future done: {}'.format(n, future.result())) async def register_callbacks(all_done): Print (' register callback to future object ') all_done. Add_done_callback (functools.partial(callback, n=1)) all_done.add_done_callback(functools.partial(callback, n=2)) async def main(all_done): Await register_callbacks(all_done) print(' future result') all_done.set_result('the result') if __name__ == '__main__': loop = asyncio.get_event_loop() try: all_done = asyncio.Future() loop.run_until_complete(main(all_done)) finally: loop.close()Copy the code

Add a callback function to the funtrue task via the add_done_callback method, which will be called when the funture execution is complete. The result of coroutine execution is obtained with the parameter future. At this point, we’ve learned how to call a normal function in a coroutine and get its result.

Execute tasks concurrently

Tasks are one of the main ways to interact with event loops. Tasks can wrap coroutines, and they can track when coroutines complete. Tasks are subclasses of futures, so they are used in the same way as futures. Coroutines can wait for tasks, and each task has a result that can be retrieved after it completes. Because coroutines are stateless, we can wrap them as stateful tasks by using the create_task method. You can also cancel a task while it is running.

Import asyncio async def child(): print(" enter child coroutine ") return "the result" async def main(loop): Task = loop.create_task(child()) print(" Cancel to cancel the task ") task.cancel() try: Await task except asyncio.CancelledError: print(" Cancel task raise CancelledError ") else: Print (" get the result of the task ", task.result()) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main(loop)) finally: loop.close()Copy the code

The output

Wrapping the coroutine child as a task cancels the task with the Cancel method. CancelledError is raisedCopy the code

If we comment out task.cancel() above we get the normal result, as follows.

By wrapping the coroutine child as a task, you can cancel the task and enter the child coroutine to obtain the result of the taskCopy the code

In addition to wrapping coroutines as tasks with loop.create_task, you can also build a task using asyncio.ensure_future(coroutine). In python3.7 you can create tasks using asyncio.create_task.

Combination coroutines

A series of coroutines can be called with await chain, but there are times when we need to wait for multiple coroutines in a coroutine, such as when we are waiting for 1000 asynchronous network requests in a coroutine. If there is a request for access order, we can use another keyword wait or Gather to solve the problem. Wait suspends a coroutine until the background operation completes.

Wait for multiple coroutines

The use of the Task

Import asyncio async def num(n): try: await asyncio.sleep(n*0.1) return n except asyncio.cancellederror: Print (f) raise async def main(): Tasks = [num(I) for I in range(10)] complete, pending = await asyncio. Wait (Tasks, timeout=0.5) for I in complete: Print (" current number ",i.result()) if pending: print(" cancel pending task ") for p in pending: p.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.close()Copy the code

The output

Current numeral 1 Current numeral 2 Current Numeral 0 Current Numeral 4 Current Numeral 3 Cancel the unfinished task Numeral 5 Cancelled Numeral 9 cancelled Numeral 6 cancelled Numeral 8 cancelled Numeral 7 cancelledCopy the code

As you can see, our results are not displayed in numerical order, and internal wait() uses a set to hold the Task instance it creates. Because the set is unordered, this is why our tasks are not executed sequentially. The return value of wait is a tuple consisting of two collections representing completed and unfinished tasks. The second parameter of wait is a timeout value. After the timeout value is reached, the pending task status changes to pending. When the program exits, the following error message is displayed:

Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e0558>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e06d8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e0738>()]>>
Copy the code

At this point we can cancel the task by iterating and calling the Cancel method. That’s this code right here

If pending: print(" cancel pending task ") for p in pending: p.canel ()Copy the code

Gather the use of

Gather is similar to Wait in one way or another. 1. The Gather Task cannot be canceled. 2. The return value is a list of results. 3. Let’s change the code above to gather

Import asyncio async def num(n): try: await asyncio.sleep(n * 0.1) return n except asyncio.cancellederror: Print (f) raise async def main(): tasks = [num(i) for i in range(10)] complete = await asyncio.gather(*tasks) for i in complete: Print (" the current digital ", I) if __name__ = = "__main__ ': loop. = asyncio get_event_loop () try: loop.run_until_complete(main()) finally: loop.close()Copy the code

The output

Current digit 0 Current digit 1.... The middle part omits the current number 9Copy the code

Gather is usually used as a staged operation, with the first step complete before the second, as in this example

import asyncio import time async def step1(n, start): Sleep (n) print(" 时 间 ") print(" 时 间 ", time.time() -start) return async def step2(n, start): Await asyncio. Sleep (n) print (" completed the second phase ") print (" unavailable "at this time, time. Time () - start) return n async def main () : now = time.time() result = await asyncio.gather(step1(5, now), step2(2, now)) for i in result: Print (I) print(" total time ", time.time() -now) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: loop.close()Copy the code

The output

The second stage is completed with 5.0014898777008057 the first stage is completed with 5.002960920333862 52 the total time is 5.003103017807007Copy the code

The following conclusions can be drawn from the above results: 1. Step1 and step2 are run in parallel. 2. Gather to wait for the longest task to complete before returning a result. The total time spent depends on which task takes the longest.

Process tasks when they are complete

As_complete is a generator that manages a list of specified tasks and generates their results. Each coroutine finishes running one result at a time. As with wait, as_complete does not guarantee order, but there is no need to wait for the background operation to complete before performing any other action.

import asyncio
import time


async def foo(n):
    print('Waiting: ', n)
    await asyncio.sleep(n)
    return n


async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))


now = lambda : time.time()
start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)
Copy the code

The output

Waiting:  1
Waiting:  2
Waiting:  4
Task ret: 1
Task ret: 2
Task ret: 4
4.004292249679565
Copy the code

You can find the output one by one.

This concludes the first part, which is enough for introductory Asyncio learning. If you want to follow up on Asyncio, stay tuned.

The resources

  • The Python 3 Standard Library by Example

  • Docs.python.org/3/library/a…

    \

Hot recommended for your Python program encryption in Python development timer with AI facial recognition technology to realize the trill effects hardcore | in Python to send his girlfriend a eggs recommended Python’s Chinese community public \ several service class

Long press the qr code above the scan code or click below to read the original text

Free to become a community registered member, members can enjoy more rights and interests