Translator: Matrixtang

Original text: ryhl. IO/blog/actors…

This article will implement the Actors system directly using Tokio without using any Actors libraries (such as Actix). It’s actually even easier, but there are a few details to note:

  1. tokio::spawnThe call location of.
  2. Use arunThe structure of a method is still a bare function.
  3. Handle for Actor.
  4. Backpressure and bounded channel.
  5. Gracefully close.

The techniques outlined in this article are applicable to any actuator, but for the sake of simplicity, we will only discuss Tokio. There is some overlap with the Spawning and Channel Chapters chapters in the Tokio tutorial, which I recommend reading as well, of course.

Before we discuss how to write actors, we need to know what actors are. The basic idea behind actors is to produce a single task that performs some work independently of the rest of the program. Typically, these actors communicate with the rest of the program by using a messaging channel. Because each Actor runs independently, programs designed using them are naturally parallel. A common use of actors is to assign actors exclusive ownership of certain resources that you want to share, and then have other tasks indirectly access each other’s resources by talking to actors. For example, if you were to implement a chat server, you could generate a task for each connection and route the main task of a chat message between the other tasks. This is useful because the main task avoids having to deal with network IO, while the connect task can deal exclusively with network IO.

implementation

Actor is divided into two parts: task and handle. The task is a independently generated Tokio task that actually performs Actor duties, and the Handle is a structure that allows you to communicate with the task.

Let’s consider a simple Actor. Actor stores a counter internally that is used to get some unique ID. The basic structure of Actor is as follows:

use tokio::sync::{oneshot, mpsc};

struct MyActor {
    receiver: mpsc::Receiver<ActorMessage>,
    next_id: u32,}enum ActorMessage {
    GetUniqueId {
        respond_to: oneshot::Sender<u32>,}}impl MyActor {
    fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
        MyActor {
            receiver,
            next_id: 0,}}fn handle_message(&mut self, msg: ActorMessage) {
        match msg {
            ActorMessage::GetUniqueId { respond_to } => {
                self.next_id += 1;

                // The `let _ =` ignores any errors when sending.
                // 'let _ =' ignores any errors sent
                // This can happen if the `select! ` macro is used
                // to cancel waiting for the response.
                / / when ` select! The macro will stop accepting responses when it is used
                let _ = respond_to.send(self.next_id); }}}},async fn run_my_actor(mut actor: MyActor) {
    while let Some(msg) = actor.receiver.recv().await{ actor.handle_message(msg); }}Copy the code

Now that we have Actor itself, we also need a handle that works with Actor. The Handle is an object that other code segments can use to talk to actors, which is what keeps actors alive.

Here is the implementation of Handle:

#[derive(Clone)]
pub struct MyActorHandle {
    sender: mpsc::Sender<ActorMessage>,
}

impl MyActorHandle {
    pub fn new() - >Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = MyActor::new(receiver);
        tokio::spawn(run_my_actor(actor));
        // Note the position of tokio::spawn
        Self { sender }
    }

    pub async fn get_unique_id(&self) - >u32 {
        let (send, recv) = oneshot::channel();
        let msg = ActorMessage::GetUniqueId {
            respond_to: send,
        };

        // Ignore send errors. If this send fails, so does the
        // recv.await below. There's no reason to check for the
        // same failure twice.
        // Ignore sending error. If it fails to send, the recv.await below will be executed
        // It makes no sense to detect the same error twice.
        let _ = self.sender.send(msg).await;
        recv.await.expect("Actor task has been killed")}}Copy the code

full example

Let’s take a closer look at the different parts of this example.

The ActorMessage enumeration defines the types of messages we can send to actors. By using this enumeration, we can have many different message types, and each message type can have its own set of parameters. We return the value to the sender over the oneshot channel, which allows only one message to be sent.

In the example above, we matched on an enumeration in the handle_message method of the actor structure, but that’s not the only way to construct this method. Matches can also be made in the enumeration of the run_my_actor function. Each branch in this match can then call various methods on the actor object, such as get_unique_id.

Errors in sending messages Not all errors are fatal when processing channels. Therefore, the example sometimes uses let _ = to ignore errors. Normally, if the receiver is discarded, the SEND operation on the channel will fail. In our example, the first instance of this operation is the row in the actor where we respond to the sent message.

let _ = respond_to.send(self.next_id);)
Copy the code

This occurs when the recipient no longer needs the result of the operation, for example the task that sent the message may have been killed.

Shutting down actors We can decide when to shut down actors by looking at whether receiving messages failed. In our example, this happens in the following while loop:

while let Some(msg) = actor.receiver.recv().await {
    actor.handle_message(msg);
}
Copy the code

When all the sender sent to the receiver is discarded, we know that no more information will be received, so we can turn off the Actor. When this happens, calling.recv () returns None, and since it does not match the pattern Some (MSG), the while loop exits and the function returns.

The run method of a structure

The top-level function used in the example I gave above is not defined on any structure because we generated it as a Tokio task, but many people find it more natural to define the run method and start it directly in the MyActor structure. It’s not impossible, but the reason I’m using the top-level function example is that you can avoid a lot of life cycle problems using this approach. To illustrate this problem, I have prepared an example of what often comes to mind for people unfamiliar with the pattern.

impl MyActor {
    fn run(&mut self) {
        tokio::spawn(async move {
            while let Some(msg) = self.receiver.recv().await {
                self.handle_message(msg); }}); }pub async fn get_unique_id(&self) - >u32 {
        let (send, recv) = oneshot::channel();
        let msg = ActorMessage::GetUniqueId {
            respond_to: send,
        };

        // Ignore send errors. If this send fails, so does the
        // recv.await below. There's no reason to check for the
        // same failure twice.
        let _ = self.sender.send(msg).await;
        recv.await.expect("Actor task has been killed")}}... and no separate MyActorHandleCopy the code

There are two problems with this example:

  1. tokio::spawninrunMethod is called.
  2. Actor and Handle are actually one structure.

The first reason for the problem is that the tokio :: spawn function requires a ‘static’ argument. That means the new task must have full ownership, which causes the method to borrow self, so it can’t give self ownership to the new task.

The second problem is that Rust enforces the principle of single ownership. If you combine actor and Handle into the same structure, then (at least from the compiler’s point of view) you give each handle access to all the fields owned by the actor’s task. For example, next_id should only be owned by the actor task and should not be accessed directly by any Handle.

In other words, there is a version that is feasible by solving these two problems. The code is as follows:

impl MyActor {
    async fn run(&mut self) {
        while let Some(msg) = self.receiver.recv().await {
            self.handle_message(msg); }}}impl MyActorHandle {
    pub fn new() - >Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = MyActor::new(receiver);
        tokio::spawn(async move { actor.run().await });

        Self { sender }
    }
}
Copy the code

This function is the same as the top-level function. Note that it is technically possible to write tokio :: spawn in Run, but I don’t recommend it.

Other variants of actor

My examples in this article use a request-response model in which participants use messages, but this is not required. In this section, I’ll give you some examples of using other methods to give you some inspiration.

Not responding to messages

In the previous example we introduced a way to send a response to a message using the oneshot channel, but a response is not always required. In these cases, it is fine to simply exclude the oneshot channel from the message enumeration. When there is space in the channel, this allows you to return even before you finish processing the message. However, you should still ensure that a bounded channel is used to ensure that the number of messages waiting in that channel does not grow indefinitely. In some cases, this means that the send operation still needs to be handled by an asynchronous function for cases where the waiting channel needs more space. However, there is an alternative to making send asynchronous. That is, use the try_send method and handle sending failures by simply killing the Actor. This is useful in cases where Aoctor manages TcpStream and forwards any messages sent to the connection. In this case, if you cannot continue writing to TcpStream, you can simply close the connection.

Multiple Handles share an Actor

If you need to send messages to an actor from different places, you can use multiple handles to force certain messages to be sent from only certain places. When using this approach, you can still internally reuse the same MPSC channel with enumerations containing all possible message types. If you have to use a separate channel for this, actor can use tokio::select! To receive information in multiple channels at once.

loop {
    tokio::select! {
        Some(msg) = chan1.recv() => {
            // handle msg
        },
        Some(msg) = chan2.recv() => {
            // handle msg
        },
        else= >break,}}Copy the code

It is important to note what happens when the channel is closed, because in that case their RECV methods immediately return None. Luckily, Tokio :: Select! Macros allow you to handle this situation by supplying Some (MSG). If only one channel is closed, the branch is disabled and the other channel is still available. When both are closed, the else branch runs and exits the loop using break.

Actors send messages to each other

It is also possible to have actors send messages to other actors. To do this, simply provide an Actor with the handle of another Actor. When actors form loops, care needs to be taken because the last sender will not be discarded in case the Actor is closed, in order to keep each other’s handles alive. To handle this, you can have an actor with two handles with separate MPSC channels, tokio :: select! Will be used in the following example:

loop {
    tokio::select! {
        opt_msg = chan1.recv() => {
            let msg = match opt_msg {
                Some(msg) => msg,
                None= >break};// handle msg
        },
        Some(msg) = chan2.recv() => {
            // handle msg}}},Copy the code

If CHAN1 is closed, the above loop exits even if Chan2 remains open. If chan2 is part of the Actor loop, this breaks the loop and causes the Actor to close.

Simply call abort in the loop.

Multiple Actors share a handle

Just as each Actor can share multiple Handles, each handle can share multiple Actors. The most common example is when dealing with a connection such as TcpStream, which typically generates two tasks: one for reading and one for writing. When using this pattern, you need to make reading and writing tasks as simple as possible — their only job is to execute IO. The read task sends all the messages it receives to another task, usually another Actor, while the write task forwards all the messages it receives to the connection. This pattern is useful because it isolates the complexity associated with executing IO, which means that other parts of the program can pretend to write something to the connection immediately, even though the actual writing takes place after the Actor processes the message.

Beware of circulation

I’ve talked a little bit about looping under the heading sending Messages between Actors, where I discuss how to turn off looping Actors. However, how the loop is closed is not the only problem that can arise because the loop also produces deadlocks, where each Actor in the loop waits for the next Actor to receive the message, but the next Actor does not receive the message until its next Actor does, and so on. To avoid this deadlock, you must ensure that the channel capacity of the loop is not limited. The reason for this is that a send on a bounded channel does not return immediately, and a channel with an immediate return SEND is not counted in this loop because send does not generate deadlocks. Be careful, this means that oneshot channels do not cause deadlocks either, because they also have send methods that return immediately. Also be careful that if you use try_send instead of send to send messages, this is not part of a deadlock loop.

Thanks to Matklad for pointing out loops and deadlocks.


Translator introduction:

Matrixtang, Rust/ CPP programmer, is a security enthusiast with an interest in compiler related fields and not PWN.