The premise

This article describes the principle of asynchrony in Rust, which has only been stabilized in the most recent release (1.39). I hope this article can help readers to solve their doubts while improving their own cognition. (From my own experience of being beaten by Rust asynchronously).

Reading this article requires some understanding of operating systems, IO multiplexing, and some data structures.

Future

A Future literally means something that will happen in the Future. In a program, it represents a series of operators that have no result at the moment. A Future requires the program to poll(poll) actively to get the final result.

When Ready, prove that the current Future is complete and the code logic can be executed downward; When Pending, it means that the Future is Pending, so the code can’t poll down. So when can the Future poll down? The key here is that executors in Runtime need to poll the Future continuously. Until the Future returns Ready and can be executed down. Wait, those of you who are familiar with Linux might say how similar it feels to the Epoll model, and yes, it does (but there is still a slight difference, Future can avoid empty polling), and it looks like good design can be found everywhere. Some optimizations have been made to achieve the high performance and zero-overhead abstraction claimed by Rust, described below.

The Future structure

pub enum Poll<T> {

Ready(T),

Pending,

}



pub trait Future {

type Output;



fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

}

Copy the code

The poll method is the key to executing the Future. The poll method can return a poll type. The poll type is an Enum that wraps both Ready and Pending states.

Runtime

The Runtime consists of two parts, Executor and Reactor.

An Executor is an Executor that loops through a set of ready futures without any blocking wait. When the Future returns pending, it is transferred to the Reactor for further awakening.

The Reactor is a Reactor that polls and wakes up mounted events, and executes the corresponding WAKE method. Typically, WAKE changes the Future’s state to ready and places the Future on an Executor queue for execution.

Execute the process

The sequence diagram below briefly depicts the Future’s flow and state changes back and forth between Executor and Reactor.

sequenceDiagram

participant Executor

participant Reactor

activate Executor

Executor->>Reactor: Pending Future

deactivate Executor

Note left of Executor: Execute other Future

activate Reactor

Reactor->>Executor: Ready Future

deactivate Reactor

activate Executor

deactivate Executor

Copy the code
rust-future

If a Future is more complex, for example, there will be multiple IO operations in the middle, what will happen to the process? Look at the following code :(as a demo only, does not mean it can be used directly)

async fn read_and_write(s: TcpStream) {

let (mut r, mut w) = s.split();

let mut buffer = r.read().await.unwrap();

buffer.append("Hello,world");

w.write_all(buffer.as_bytes()).await.unwrap();

}

Copy the code

The corresponding execution process is as follows:

sequenceDiagram

participant Executor

participant Reactor

activate Executor

deactivate Executor

Executor->>Reactor: Pending on r.read()

Note left of Executor: Execute other Future

activate Reactor

Reactor->>Executor: r.read() is ready

Note left of Executor: Execute current Future

deactivate Reactor

Executor->>Reactor: Pending on w.write_all()

Note left of Executor: Execute other Future

activate Reactor

deactivate Reactor

Reactor->>Executor: w.write_all() is ready

Copy the code
rust-future-complex

The examples above show only one Future being executed. In a real production environment, where there might be hundreds of thousands of futures executing simultaneously, the Executor and Reactor scheduling models are more complex.

conclusion

The Future is passed to the Reactor if it does not return a value immediately. When the Future is Ready, wake is called to execute it repeatedly until the entire Future returns Ready.

Executor

In general, Executor implementations can be single-threaded or thread-pooled. Both implementations have their advantages and disadvantages. Single-threaded implementations have less contention for data, but throughput bottlenecks tend to occur. Let’s analyze the thread pool-based implementation with async-STD:

fn main_loop() {

loop {

match find_runnable() {

Some(task) => task.run();

None= > {

// In fact, depending on the number of empty loops, you fall asleep or spend CPU resources until a new task wakes you up.

}

}

}

}



fn find_runnable() - >Option<Task> {

let task = get_local();

if task.is_some() {

return task;

}

let task = get_local();

if task.is_some() {

return task;

}

steal_other()

}

Copy the code

The entire Executor is a pool of threads, each of which is constantly looking for an executable task, then executing it, then finding the next task, then executing it again, forever.

As you can see from the main_loop above, the CPU is not idling meaninglessly all the time, and there are strategies in place to optimize CPU usage.

Reactor

Reactor as a Reactor, which simultaneously mount thousands of events to be awakened, uses miO to encapsulate the multiplexing API of the operating system. Epoll is used on Linux and Kqueue is used on Macs, but the exact implementation is not covered here.

On the basis of the Future, the AsyncRead AsyncWrite/AsyncSeek abstractions to describe IO operations, such as in the implementation of corresponding to the Read/Write/Seek operation, if the underlying data is not yet ready, will take in the Future to register to the Reactor. Reactor’s flow is as follows:

loop {

poll.poll(&events, timeout);

for event in events.iter() {

if (event.is_readable()) {

for waker in event.readers.wakers {

waker.wake();

}

}

if (event.is_writeable()) {

for waker in event.writers.wakers {

waker.wake();

}

}

}

}

Copy the code

The Reactor continuously polls ready events and wakes up the Waker bound to the event in turn. When waker wakes up, it moves the task to the Executor’s ready queue for execution.

Combined with the operation principle of Executor, it is not difficult to find that Executor will not poll unready tasks, because only ready tasks will be put into Executor’s execution queue, and Executor’s resource utilization will be improved again, which is the genius of the whole asynchronous system.

Stream

A Future is the most fundamental concept in asynchronous development. If a Future represents a one-time asynchronous value, a Stream represents a series of asynchronous values. Future is 1, Stream is 0,1, or N. Signed as follows:

pub trait Stream {

type Item;



fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

}

Copy the code

Stream corresponds to the concept of Iterator in the synchronization primitive, and if you think about it, even the signature is similar.

pub trait Iterator {

type Item;



fn next(&mut self) - >Option<Self::Item>;

}

Copy the code

Stream is used to abstract an endless Stream of data sources. Can be used to abstract Websocket Connection, in Websokcet, the service continuously receives the value of the client and processes it until the client disconnects. Further abstractions, such as a Consumer in MQ or a business packet in Tcp, can be considered a Stream, so the Stream abstraction makes sense for asynchronous programming.

Sink

There is a Future that represents a one-time asynchronous value, and there is also a Stream that represents a repeatable asynchronous value. Therefore, there needs to be an asynchronous value that represents one or more times, which is the next Sink.

pub trait Sink<Item> {

type Error;



fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

}

Copy the code

Timer

Unlike Tcp/Udp/Uds, MIO does not provide Timer encapsulation.

In general, timers are handled either as a time wheel or a heap, or as a red-black tree (with more average time complexity O(logN)). The time wheel is typically used in Kafka, while the Go Runtime uses a heap. Red-black trees and heaps are implemented in much the same way.

  1. The time wheel algorithm can be imagined as a clock, each cell stores the expiration timer, so the minimum precision of the time wheel is the time represented by each cell. What if the timer time exceeds the time represented by the time wheel is also simple and can be optimized in two ways.
  • Multi-level time wheel to optimize, you can imagine, on the clock, the second hand every turn, the minute hand goes a grid, the same minute hand goes a circle, the hour hand goes a grid, so the multi-level time wheel, the first level of time is the most accurate, the second level, the third level again… When the number of events that can be represented by a certain level of time wheel is exceeded, the timer is placed in the next level of time wheel.
  • After the time range represented by the time wheel is exceeded, the time mod is inserted into the remaining cell, so that the timer stored in each cell needs to add a record of the number of rounds to indicate how many rounds are left to execute. You can use the heap to heap timer to sort each cell when a new timer is inserted.
  1. Heap timer (Red-black tree timer)

Use minimum heap to maintain all timers. A worker Thread continuously searches the heap for the nearest timer. If the timer time is smaller than the current time, it wakes up the task corresponding to the timer. If the timer time has not reached the specified time, it performs Thread:: Park (deadline-now) operation to release the current CPU for a period of time.

The futures- Timer implementation is currently the only heap in the play. There is an optimizable space…

combinators

The basic concepts for implementing asynchrony, Future, Stream, and Sink, are defined above.

But in many cases, it is very difficult to directly use them to build our application. For example, if there are multiple competing futures, we only need to return one of them. The best way to think about it is to iterate through all of them until one of them returns Ready:

loop {

for f in futures {

if f.is_ready() {

return f.output();

}

}

}

Copy the code

We can wrap up the above logic by providing a select! (futures…) Select can exist as a combinator. There are many similar combinations, such as Join (futures…). Wait for all futures to complete.

See futures-util for more information.

Async/Await

All of the above concepts together make up the asynchronous ecosystem of Rust, so now imagine how to capture the results of a Future run. One possible approach is as follows:

loop {

match f::poll(cx) {

Poll::Ready(x) => return x;

Poll::Pending => {}

}

}

Copy the code

How painful it would be to have the user do this every time, instead of using the registration callback function for asynchrony!

Is there a more refined way to get a Future value, which is where async/await comes in. In essence, async/await is a syntactic candy of the above code section, which is more natural for users to use. The above code can be replaced with:

let x = f.await;

Copy the code

Isn’t that a huge simplification?

conclusion

Despite the various concepts mentioned above, a closer look reveals that asynchrony can be divided into three layers:

  1. Future/Stream/Sink.Reactor/ExecutorDirectly acting on the previous three types. This layer is the bottom layer, the general user rarely contact, library developers contact more.
  2. The combinatorics layer, in order to provide more complex operations, has created a series of asynchronous combinators to make asynchrony easier to use, and users will use these combinators to accomplish a variety of logic.
  3. async/awaitTo be exact, this layer is far less important than the two above, but still indispensable, and makes asynchronous development a breeze.