Author: Rachelint
background
Our team used it extensively in the project in futures 0.1 release, but with futures growing to 0.3 and rust introducing the async/await keyword, it became significantly more readable and maintainable. Updating Futures Crate is a matter of time. However, upgrading all the relevant code in a project at once would obviously be unacceptable and difficult to ensure stability. So we decided to take advantage of the compatibility layer provided by Futures 0.3 and gradually upgrade the Code related to Futures. In this article, I want to share some of my understanding of compatibility layer implementation, as well as some of my experience in practice. The article can be roughly divided into three parts:
- General usage of compatibility layer
- Compatibility layer principle analysis
- Practical experience
Some of my views may be too subjective or wrong. I welcome your comments.
1. Differences between Future01 and Future03
Let’s start with the declarations for traits in both versions (in this context, the Future trait for Futures 0.1 is called Future01 and futures 0.3 is called Future03, similarly below) :
/ / 0.1
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
/ / 0.3
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
Copy the code
As you can see, there are three main differences:
- Future01 has two association types, Item and Error, because Poll is actually a special Result type.
Future03 has only one association type Output, where Poll becomes an enumeration type. 2. One of the parameters in Future03 poll changed from &mut self in Future01 to Pin<&mut self >; 3. Future03 poll cx: &mut Context; Here is a brief introduction to the difference between point 2 and Point 3:
- Why does Future03 need Pin<&mut Self>? First Pin will only limit the implementation! Unpin type movement, whereas Rust only implements some of the following exceptions! Unpin:
- PhantomPinned;
- The compiler is an impL Future structure generated after async/await desugar;
Obviously, Pin is designed to solve the problem of async/await Future generation automatically, which is self-referential, and moving the self-referential structure will invalidate the pointer. Due to limited space, I will not expand here. For more detailed knowledge about Pin, I recommend reading the following articles:
- Rust’s Pin and Unpin
- Rust Async: Pin concept parsing
-
Why does Future03 need cX: &mut Context? Waker: & waker in the poll(waker: & waker in the poll(waker: & waker in the poll)) How to get it in.
The general flow of notify in Future01:
The general process of passing and getting notify in Future 0.3 is as follows:
Obviously, the transition between Future01 and 03 is just a matter of resolving the later differences, and the actual implementation is basically the same idea.
2. Usage
Our intention was to have futures 0.1 and 0.3 co-exist in the project and convert them as needed.
Cargo. Toml configuration
- First, resolve the issue that crate 0.1 and Crate 0.3 are both called futures. Cargo provides the renaming function. Since the project is still futures 0.1 for the time being, rename Futures 0.3 to Future03.
[dependencies]
futures = "0.1"
futures03 = { package = "futures", version = "0.3" }
Copy the code
- Then you need to solve the problem of converting them to each other. The Compat feature provided by Futures 0.3 can be enabled here, which can be easily resolved using combinators, with a simple setup in Cargo. Toml.
[dependencies]
futures = "0.1"
futures03 = { package = "futures", version = "0.3", features = ["compat"]}Copy the code
Conversion method
So how do you convert to each other? This is relatively simple and can be implemented directly using compat combinators.
use futures::future::Ok;
use futures03::compat::Future01CompatExt;
use futures03::executor::block_on;
fn main() {
let fut01 = futures::future::ok::<i32(a) > (42);
let fut03 = fut01.compat();
let res = block_on(fut03);
println!("the fut03 res:{}", res.unwrap());
}
Copy the code
Future03 Future01. Future03, which can convert Future01, needs to meet the following two limitations (why will be explained later in this article) :
- This Future03 needs to be a TryFuture. What kind of Future03 is TryFuture? Output = Result; TryFuture = Result; Output = Result; Output = Result;
impl<F, T, E> TryFuture for F
where
F: ?Sized + Future<Output = Result<T, E>>,
{
...
}
Copy the code
- This Future03 needs to be Unpin. Pin parsing is complicated, so I will leave it for later. So if the current Future03 is neither TryFuture nor Unpin. So how do I get to Future01? We can just use a combinator.
The unit_error combinator can solve problem 1 by converting Future03<Ouput=T> to Future03<Output=Result<T, ()>>; The Boxed combinator solves problem 2 by making it Unpin by putting it in the heap.
use futures03::future::{FutureExt, TryFutureExt};
use futures::Future;
async fn make_fut03() - >i32 {
42
}
fn main() {
// This is a Future that neither TryFuture nor Unpin
let fut03 = make_fut03();
// let fut01 = fut03.compat(); Error: an error that does not satisfy the trait limit is reported at compile time
let fut01 = fut03.unit_error().boxed().compat();
let res = fut01.wait();
println!("the fut01 res:{}", res.unwrap());
}
Copy the code
3. Principle analysis
Compat01As03 analysis
Future01 to Future03 can be converted to compat01AS03.rs on compat/ compat01AS03.rs.
- Conversion of association types and polls:
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Compat01As03<T> {
pub(crate) inner: Spawn01<T>,
}
impl<Fut: Future01> Future03 for Compat01As03<Fut> {
type Output = Result<Fut::Item, Fut::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> task03::Poll<Self::Output> {
poll_01_to_03(self.in_notify(cx, Future01::poll))
}
}
fn poll_01_to_03<T, E>(x: Result<Async01<T>, E>) -> task03::Poll<Result<T, E>> {
match x? {
Async01::Ready(t) => task03::Poll::Ready(Ok(t)),
Async01::NotReady => task03::Poll::Pending,
}
}
Copy the code
As can be seen, Compat01As03 is implemented as Future03<Output = Result<Fut::Item, Fut::Error>>, and the conversion logic of poll return value is quite simple.
- For Pin, Unpin is implemented directly for Compat01As03. Addressing unsafe, there is no constraint on T being Unpin in this source file, perhaps because currently, if you want to implement self-referent structures, you must only use broadening.
impl<T> Unpin for Compat01As03<T> {}
Copy the code
- Convert Waker to NotifyHandle:
struct NotifyWaker(task03::Waker);
#[allow(missing_debug_implementations)]
#[derive(Clone)]
struct WakerToHandle<'a> (&'a task03::Waker);
impl From<WakerToHandle<'_>> for NotifyHandle01 {
fn from(handle: WakerToHandle<'_- > >)Self {
// Place waker in the heap
let ptr = Box::new(NotifyWaker(handle.0.clone()));
// Get the pointer and put it in the NotifyHandle
unsafe { Self::new(Box::into_raw(ptr)) }
}
}
// Notify waker
Waker.wake_by_ref ()
impl Notify01 for NotifyWaker {
fn notify(&self, _ :usize) {
self.0.wake_by_ref(); }}// In the source code listed in the previous section, you can see in_notify.
// In_notify passes the Waker NotifyHanle to Spawn::poll_fn_notify.
// The following process can refer to the previous flow chart
fn in_notify<R>(&mut self, cx: &mut Context<'_>, f: impl FnOnce(&mut T) -> R) -> R {
let notify = &WakerToHandle(cx.waker());
self.inner.poll_fn_notify(notify, 0, f)
}
Copy the code
Put the Waker in the heap, get its pointer, put it in the NotifyHanle, and pass the constructed NotifyHanle to Spawn::poll_fn_notify. So while calling task.notify() to wake up the inner Futures01, it can instead call waker.wake_by_ref() of the outer Compat01As03 to wake it up, as well as other related functions.
Compat analysis
Future03 through 01 is similar, the main logic is on compat/ compat03AS01.rs.
- Conversion of association types and polls:
#[derive(Debug, Clone, Copy)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Compat<T> {
pub(crate) inner: T,
}
fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
match x? {
task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
task03::Poll::Pending => Ok(Async01::NotReady),
}
}
impl<Fut> Future01 for Compat<Fut>
where
// Notice that Fut is restricted, and you need to implement TryFuture03 and Unpin
Fut: TryFuture03 + Unpin,
{
type Item = Fut::Ok;
type Error = Fut::Error;
fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
}
}
Copy the code
As you can see, Compat is implemented as Future01<Item = Fut::Ok, Error = Fut::Error>. The conversion logic for poll returns is also relatively simple. As you can see from the source code, Fut needs to implement TryFuture03 and Unpin, which explains why Future03 to 01 requires unit_error and boxed combinators (see Chapter 1 for details).
- With regard to Pin, Future03 is generally automatically generated by async/await, and as shown in section 1 of this chapter, this automatically generated Future is one of the few automatically implemented by Rust! One of the types of Unpin, so there is a restriction here that there will only be an association type T that meets the Unpin of Compat implementation Future01.
When you want to convert an async/await automatically generated Future03(in most cases) to Future01, you must first call the Boxed combinator to rebuild a new Future03 that satisfies the Unpin.
- Convert NotifyHandle to Waker:
#[derive(Clone)]
struct Current(task01::Task);
impl Current {
fn new() - >Self {
Task ::current() retrieves the current task structure,
// Then use it to build the Current structure and convert it to a null pointer passed in when building RawWaker.
// Call to wake via vtable and pass parameters to the familiar task.notify() call.
Self(task01::current())
}
fn as_waker(&self) -> WakerRef<'_> {
// Dereference the pointer and then borrow it
unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
&*(ptr as *const Current)
}
// Convert self to void*
fn current_to_ptr(current: &Current) -> *const () {
current as *const Current as *const()}/ / need to implement in the vtable way / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / / /
unsafe fn clone(ptr: *const ()) -> RawWaker {
...
}
unsafe fn drop(_ : *const() {}unsafe fn wake(ptr: *const ()) {
ptr_to_current(ptr).0.notify()
}
////////////////////////////////////////////////////////
// Convert self to a null pointer, mainly for passing in later when building RawWaker
let ptr = current_to_ptr(self);
/ / build vtable
let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
// Build RawWaker, build Waker
WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
task03::Waker::from_raw(RawWaker::new(ptr, vtable))
}))
}
}
Copy the code
Compat01As03 is similar to Compat01As03, in order to call the inner Future03 waker.wake()/waker.wake_by_ref(), It can then be woken up by calling task.notify() of the outer Compat, as well as other related functions.
4. Some practical experience
How to do a local upgrade well
In this task, I need to upgrade futures 0.1 to 0.3 for the specific path for Crate who wants to join the project. I wanted to take the opportunity to introduce async/await on relevant paths to improve code readability and maintainability. In the process of implementation, there are two main problems:
- The current Trait does not support Async FN.
- You also need to keep the old Runtime, because it would be too disruptive to upgrade the runtime (almost all relevant parts of the project);
For now, the Trait’s lack of async FN support can be resolved by using the third-party library Async-Trait, which is simple to use. The general idea is that the async_trait macro converts code to a synchronous method that returns Pin<Box<dyn Future + Send + ‘async>>.
use futures03::executor::block_on;
use async_trait::async_trait;
#[async_trait]
trait TestTrait {
async fn method1(&self);
async fn method2(&self);
}
struct TestStruct;
#[async_trait]
impl TestTrait for TestStruct {
async fn method1(&self) {
println!("I am async method1");
}
async fn method2(&self) {
println!("I am async method2"); }}fn main() {
let ts = TestStruct;
block_on(ts.method1());
block_on(ts.method2());
}
Copy the code
So how do you effectively upgrade the call path to async/await while keeping the old version Runtime? My personal recommendation is to convert Future01 returned from bottom to 03 with compat combinator, then keep async/await in the middle path, and then convert Future01 again with compat and other combinator at top to old Runtime. The following is an example:
use futures::Future;
use futures03::compat::Future01CompatExt;
use futures03::{FutureExt, TryFutureExt};
use futures_cpupool::CpuPool;
type BoxedFuture<T> = Box<dyn Future<Item = T, Error = ()> + Send>;
fn func1() -> BoxedFuture<i32> {
Box::new(futures::future::ok(42))}async fn func2() - >Result<i32, () > {// convert to Future 0.3
let res1_compat_03 = func1().compat().await? ;Ok(res1_compat_03 + 42)}async fn func3() - >Result<i32, () > {let res2_03 = func2().await? ;Ok(res2_03 + 42)}fn main() {
let pool = CpuPool::new(4);
let fut03 = func3();
// convert back to Future 0.1
let fut01 = fut03.boxed().compat();
// spawn to the old runtime
let res = pool.spawn(fut01).wait();
println!("res:{}", res.unwrap());
}
Copy the code
The difference between a function that returns a Future and async fn
In futures 0.1, since there is no async fn, it is common to write a function that returns the Future, for example:
type BoxedFuture<T> = Box<dyn Future<Item = T, Error = ()> + Send>;
fn func1() -> BoxedFuture<i32> {
Box::new(futures::future::ok(42))}Copy the code
Async FN has a subtle difference from Async FN, using Minitrace-rust Crate for example.
use futures03::{Future as Future03, FutureExt};
use minitrace::{CollectArgs, FutureExt as MiniFutureExt, Span};
use std::pin::Pin;
type BoxedFuture<T> = Pin<Box<dyn Future03<Output=T> + Send> >;async fn func1() - >i32 {
42
}
// async fn func2() -> i32 {
// func1().in_span(Span::from_local_parent("func1")).await
// }
fn func2() -> BoxedFuture<i32> {
func1().in_span(Span::from_local_parent("func1")).boxed()
}
#[tokio::main]
async fn main() {
let (span, collector) = Span::root("func2");
let f = func2().in_span(span);
tokio::spawn(f).await.unwrap();
let spans = collector.collect_with_args(CollectArgs::default().sync(true));
for span in spans {
println!("span: event:{}, id:{}, pid:{}", span.event, span.id, span.parent_id); Span: event:func2, id: func21, pid:0Comment func2 out, and removeasyncFunc2 span: event:func1, id:65537, pid:1
span: event:func2, id:1, pid:0
Copy the code
When async fn func2() -> i32 is changed to func2() -> BoxedFuture, the span associated with func1 fails (SPAN ::from_local_parent(“func1”)). Why is that?
I was also confused when I first encountered this problem, but after tracking it with the debugger, I found that the problem was mainly in the order of Future generation and poll() execution at various levels.
- The approximate order of execution in the case of async FN, GenFutureFuncx denotes the Future generated or returned by funcX:
- Return the possible order of execution in the case of fn for the Future:
As shown in the flow chart, it is the order in which Span::from_local_parent() and outermost InSpan::poll() are executed that makes the difference in the final results (the two steps are annotated in the flow chart above).
Because after doing something in InSpan::poll(), a SPAN_id bound to the current thread is called local_parent, and in the case of returning the Future, Span::from_local_parent() is executed before the outermost InSpan::poll(), which causes the former to fail because local_parent has not been built at the time of execution.
This difference is subtle and generally not significant, since Future::poll() is executed in the correct order, but it still makes a difference in the above example and is worth knowing a little bit about.
5. To summarize
The above is my understanding and experience of the Futures compatibility layer. I have read a lot of related materials before due to the work in this field, but I have not had time to sort them out. This review and summary, feeling or harvest quite big.
reference
- Several ways to write asynchronous code
- Compatibility Layer
- Rust-future and asynchrony
- TiKV Rust Client Migration – Futures 0.1 to 0.3
- Rust’s Pin and Unpin
- Async/Await
- Rust Async: Pin concept parsing
- rfcs – 2349