It’s not easy to understand coroutines. If you are overwhelmed by the number of articles and have no experience with it, or are just using it mindlessly async/await, you will have a hard time understanding its advantages and how it really works underneath.

This article assumes that the reader already understands the basic concepts of blocking/non-blocking, synchronous and asynchronous, processes, threads, and so on.

What the coroutine is for

The coroutine mechanism was born to resolve the tension between the developer’s linear processing logic (do this first, then that) and the reality of having to block execution (IO waits, computation waits, but user input can’t wait).

Hardware CPU thread resources are limited, which program can be used, which program can not be used, which program to let out to others, need OS scheduling; The schedule is either process-level or thread-level. But this scheduler doesn’t understand what our program is really doing.

Here’s the problem: as mentioned above, the processing logic is linear, but execution is not. We wanted to find a way to do non-linear things with linear code.

The essence of coroutine mechanism is a user-mode, single-thread, “Task” level scheduling. It does away with “native code” — the functions we run line by line, so to speak; Our execution logic is still there, and it looks very much like blocking synchronous code, but it’s actually becoming asynchronous inside.

At its heart are iterators, and if you’re not familiar with what an iterator is, start with Python. In general, if you yield from it once, it returns a result from the internal yield; The next time it yields from, it will pick up where it left off and execute to yield or return and then give the outer result.

All code becomes iterators that can be “paused” somewhere and returned; On the outside of these “new native code,” put an EventLoop that schedules them; At the very bottom of this “new native code,” replace the return result with an immediate return value with a “future” object that has no substance until the pause is over and has the result content once it resumes execution.

Here “on hold” the ability to hang on the one hand is placed by the iterator, on the other hand, by the future/promise this kind of object, given by the function of the return is a timeout, but the return is not meaningless, because the suspend means that the program is ready, waiting for a future results it back when you want to return the results to him, how? The future that it returns.

Some people think that if I write more JavaScript and write more async/await it will be easier to understand coroutines. But all you see is an async await turning what looks like synchronous code into asynchronous code, and what’s going on underneath it is very complicated.

What is at the bottom of the rabbit hole of the call stack await? Why do async functions start with functions like loop.run? Why do asynchronous functions become state machines? Let’s first look at what the goal of the coroutine mechanism is.

The ultimate goal of the coroutine mechanism

The fundamental goal is that EventLoop, as the outermost program that drives the asynchronous flow of all tasks, wants to do this: EventLoop itself basically never blocks, it just keeps running a callback function ready to queue until all tasks terminate. Anything that causes a 1ms block by doing the right thing on this loop (executing the ready function), I choose not to wait, but to outsource to Executor to manage the child processes and threads; For network IO, eventLoop only needs to do non-blocking polling when necessary, throwing it to the kernel and operating system.

Of course, if you run to the end without a ready function, and all tasks have to wait for the “outsource” to return results before continuing, the system is inherently blocked, and this is as far as the concurrency optimization layer can go.

Coroutine mechanism design

Forget about the rest of the “pauseable and resumable”, “suspend”, “execute and return to future” before you hear my explanation…

Iterator: a function that contains yield inside. If the function is called on the outside using yield from, it executes once to the yield statement and saves the field pause. The external calls it again, and execution continues from below the line.

Coroutines /Task/Future: In code design they are basically the same thing: iterators that execute once and their logic takes a step down; The result is returned after a finite number of steps. It’s just packaged in different ways to do different things.

Async is used to wrap a function as an asynchronous coroutine function that returns a coroutine /future after executing it once, not the final result.

Await is used: await is yield from, and the future implements the __await__ function. It must be followed by a future or a function that returns the future. The implementation is very simple. If the back future is incomplete, the future itself is returned. This is why the coroutine suspends when it encounters await, because this time it returns the future directly; If done, return future.result, which is how Coroutine can resume execution. The first execution (immediate, non-blocking, if the called function is well designed) gets a Future and returns to the upper layer, which is the secret of coroutine non-blocking.

What role does EventLoop play in the coroutine mechanism? We can see this by examining the run_once function.

def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ sched_count = len(self._scheduled) if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), Event_list = self._selector. Select (timeout) self._process_events(event_list) # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration:  logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.Copy the code
  1. Check the ready and stop queues.
  2. Check the Scheduled queue.
  3. Make a network SELECT (poll once) and put its completion callback function into the ready queue accordingly.

Note: The reason it dares to set a blocking timeout here is because EventLoop itself is completely sequential and synchronous. If the ready and stopping queues are empty at any point, even if I continue running without blocking, then the ready and stop lists will not change at all if the network request data is not available between now and the first timer task being triggered. All tasks are still pending, so I still have to go back and continue the select without blocking, which incurs more overhead of SYSCALL entering kernel space. I might as well just let him block for a while, and as soon as something is ready, it returns, and nobody is actually blocking.

The main task of EventLoop is to execute the ready callback functions collected from these places.

Eventloop also sends the Task’s return value to the Future object, indicating that the Task has been suspended. Eventloop sets up a future.add_done_callback callback that resumes the Task’s execution when the future completes.

Who set these callback functions ready? Other callback functions, or other thread of the program!

The future.set_result function calls the loop function that adds the completion callback to the ready queue.

def set_result(self, result): """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state ! = _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self! R}') self._result = result self._state = _FINISHED self.__schedule_callbacks(Copy the code

Limitations of coroutines

Coroutines cannot:

  1. Scheduling at the code level. Eventloop will only schedule asynchronous functions concurrently if you declare them to be executed concurrently using the Gather function, and within an asynchronous function, as soon as an await appears on the call stack down, the whole asynchronous function as a whole Task will be suspended.
  2. Resolve thread synchronization issues. Coroutines themselves are single threaded, and there is no thread synchronization problem with coroutines alone.

Coroutine + multi-threaded/multi-process

In this process, if I want to do large file reads and writes, or computationally intensive operations, EventLoop can’t do these tasks on its own and must outsource them to multiple threads or processes, and then do them asynchronously. However, we don’t see any processing for this in the run_once function.

Look at the run_in_executor function:

def run_in_executor(self, executor, func, *args): self._check_closed() if self._debug: self._check_callback(func, 'run_in_executor') if executor is None: executor = self._default_executor # Only check when the default executor is being used self._check_default_executor() if  executor is None: executor = concurrent.futures.ThreadPoolExecutor( thread_name_prefix='asyncio' ) self._default_executor = executor return futures.wrap_future( executor.submit(func, *args), loop=self)Copy the code

Executor.com MIT () function that submits tasks to an executor (thread – or process-based).

The ExecutorManagerThread will call wait_result_BROKEN_or_wakeup when executing its own loop, and obtain the result returned by the Worker through the connection object:

def run(self):
    # Main loop for the executor manager thread.

    while True:
        self.add_call_item_to_queue()

        result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

        if is_broken:
            self.terminate_broken(cause)
            return
        if result_item is not None:
            self.process_result_item(result_item)
            # Delete reference to result_item to avoid keeping references
            # while waiting on new results.
            del result_item

            # attempt to increment idle process count
            executor = self.executor_reference()
            if executor is not None:
                executor._idle_worker_semaphore.release()
            del executor

        if self.is_shutting_down():
            self.flag_executor_shutting_down()

            # Since no new work items can be added, it is safe to shutdown
            # this thread if there are no pending work items.
            if not self.pending_work_items:
                self.join_executor_internals()
                return

def wait_result_broken_or_wakeup(self):
    # Wait for a result to be ready in the result_queue while checking
    # that all worker processes are still running, or for a wake up
    # signal send. The wake up signals come either from new tasks being
    # submitted, from the executor being shutdown/gc-ed, or from the
    # shutdown of the python interpreter.
    result_reader = self.result_queue._reader
    assert not self.thread_wakeup._closed
    wakeup_reader = self.thread_wakeup._reader
    readers = [result_reader, wakeup_reader]
    worker_sentinels = [p.sentinel for p in self.processes.values()]
    ready = mp.connection.wait(readers + worker_sentinels)

    cause = None
    is_broken = True
    result_item = None
    if result_reader in ready:
        try:
            result_item = result_reader.recv()
            is_broken = False
        except BaseException as e:
            cause = traceback.format_exception(type(e), e, e.__traceback__)

    elif wakeup_reader in ready:
        is_broken = False

    with self.shutdown_lock:
        self.thread_wakeup.clear()

    return result_item, is_broken, cause
    
def process_result_item(self, result_item):
    # Process the received a result_item. This can be either the PID of a
    # worker that exited gracefully or a _ResultItem

    if isinstance(result_item, int):
        # Clean shutdown of a worker using its PID
        # (avoids marking the executor broken)
        assert self.is_shutting_down()
        p = self.processes.pop(result_item)
        p.join()
        if not self.processes:
            self.join_executor_internals()
            return
    else:
        # Received a _ResultItem so mark the future as completed.
        work_item = self.pending_work_items.pop(result_item.work_id, None)
        # work_item can be None if another process terminated (see above)
        if work_item is not None:
            if result_item.exception:
                work_item.future.set_exception(result_item.exception)
            else:
                work_item.future.set_result(result_item.result)
Copy the code

The set_result operation is then performed on the future bound to the WorkItem, which calls the corresponding callback on the Future and adds the function that restores the corresponding coroutine to the ready queue. Because the ExecutorManagerThread and Eventloop threads are both in the same process (called Local), you can run to modify the future.

All registration in the EventLoop is non-blocking. The key is that the executor gets the future created in the async function so that he can change its state. Restore coroutine running on main thread.