Introduction to the

Akka’s mailbox holds messages for actors. Typically, each Actor will have its own mailbox, but there are exceptions, such as BalancingPool, where all routers (routees) will share an instance of a mailbox.

email

The default mailbox

If no mailbox is specified, the default mailbox is used. By default, it is a borderless mailbox, by Java. Util. Concurrent. ConcurrentLinkedQueue support.

SingleConsumerOnlyUnboundedMailbox mailbox is a more efficient, it can be used as the default mailbox, but cannot be used with BalancingDispatcher.

Will SingleConsumerOnlyUnboundedMailbox configuration as the default email address:

akka.actor.default-mailbox {
  mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
Copy the code

Actor specific mailbox

An actor of a particular type can use a mailbox of a particular type, as long as the actor implements the parameterized interface RequiresMessageQueue. Here’s an example:

import akka.dispatch.BoundedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;

public class MyBoundedActor extends MyActor
    implements RequiresMessageQueue<BoundedMessageQueueSemantics> {}
Copy the code

RequiresMessageQueue interface type parameters need to be mapped to the configured mailbox, as follows:

bounded-mailbox {
  mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
  mailbox-capacity = 1000 
}

akka.actor.mailbox.requirements {
  "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
Copy the code

Now, every time an Actor of type MyBoundedActor is created, it tries to get a bounded mailbox. This overrides this mapping if the Actor has different mailboxes configured in the deployment, either directly or through a Dispatcher with the specified mailbox type.

  • annotation: in order toActorThe queue type in the mailbox created is checked against the required type in the interface, if the queue does not implement the required type, thenactorThe creation will fail.

Dispatcher specific mailboxes

The Dispatcher also needs a mailbox type for running actors. An example is BalancingDispatcher, which requires a concurrent, thread-safe message queue. Such requirements can be planned in the dispenser configuration, as follows:

my-dispatcher {
  mailbox-requirement = org.example.MyInterface
}
Copy the code

A given requirement names a class or interface that must be guaranteed to be a supertype of the message queue implementation. In case of a conflict, for example, if an actor needs a mailbox type, but it does not satisfy that requirement, the actor creation will fail.

How do I select an email type

When creating the Actor, the ActorRefProvider first determines that its Dispatcher will execute. Then determine the email type in the following order:

  1. ifactorThe deployment configuration section contains onemailboxKeyword, then thismailboxThe keyword specifies the type of mailbox to use;
  2. ifactorthePropsInclude the Mailbox selection – that’s calledwithMailboxMethod – then this method specifies the type of mailbox to use;
  3. If the configuration part of the dispenser contains onemailbox-typeKeyword, then this section will also be used to configure the mailbox type;
  4. If the actor needs the mailbox type described above, the mapping of this requirement will be used to determine the mailbox type; If that fails, the dispenser’s requirement – if it exists – will be tried;
  5. If the dispenser requires the mailbox type described below, the mapping of this requirement will be used to determine the mailbox type;
  6. The default mailbox akka.actor. Default-mailbox will be used.

Which configurations are passed to the Mailbox type

Each MailboxType is implemented by a class that extends MailboxType and takes two constructor arguments: the actorsystem.settings object and the Config section. The latter is obtained from the actor system configuration, overwriting its ID keyword with the mailbox type configuration path, and adding a callback to the default mailbox configuration.

Built-in mailbox implementation

Akka comes with a number of mailbox implementations:

  • UnboundedMailbox(the default)
    • The default mailbox
    • byjava.util.concurrent.ConcurrentLinkedQueuesupport
    • Blocked or not:No
    • Bounded or not:No
    • Configuration name:unboundedorakka.dispatch.UnboundedMailbox
  • SingleConsumerOnlyUnboundedMailboxDepending on your use case, this queue may or may not be faster than the default queue, be sure to benchmark correctly!
    • Supported by a single consumer queue from multiple manufacturers and cannot be used with BalancingDispatcher
    • Blocked or not:No
    • Bounded or not:No
    • Configuration name:akka.dispatch.SingleConsumerOnlyUnboundedMailbox
  • NonBlockingBoundedMailbox
    • Supported by a very efficient “multi-producer, single-consumer” queue
    • Blocked or not:No(Discard the overflow message asdeadLetters)
    • Bounded or not:Yes
    • Configuration name:akka.dispatch.NonBlockingBoundedMailbox
  • UnboundedControlAwareMailbox
    • Pass extends with higher priorityakka.dispatch.ControlMessageThe news of the
    • By twojava.util.concurrent.ConcurrentLinkedQueuesupport
    • Blocked or not:No
    • Bounded or not:No
    • Configuration name:akka.dispatch.UnboundedControlAwareMailbox
  • UnboundedPriorityMailbox
    • byjava.util.concurrent.PriorityBlockingQueuesupport
    • The delivery order of equal priority messages is undefined, andUnboundedStablePriorityMailboxOn the contrary
    • Blocked or not:No
    • Bounded or not:No
    • Configuration name:akka.dispatch.UnboundedPriorityMailbox
  • UnboundedStablePriorityMailbox
  • By the packagingakka.util.PriorityQueueStabilizerIn thejava.util.concurrent.PriorityBlockingQueueTo provide support
  • Reserved for messages with the same priorityFIFOOrder, andUnboundedPriorityMailboxOn the contrary
  • Blocked or not:No
  • Bounded or not:No
  • Configuration name:akka.dispatch.UnboundedStablePriorityMailbox

Other bounded mailbox implementations block senders if the capacity is reached and a non-zero mailbox push-timeout-time timeout is configured. In particular, the following mailbox can only be used with zero mailbox-push-timeout-time.

  • BoundedMailbox
    • byjava.util.concurrent.LinkedBlockingQueuesupport
    • Block or not: If and non-zeromailbox-push-timeout-timeIf used together, isYes, or forNO
    • Bounded or not:Yes
    • Configuration name:boundedorakka.dispatch.BoundedMailbox
  • BoundedPriorityMailbox
    • By the packagingakka.util.BoundedBlockingQueueIn thejava.util.PriorityQueueTo provide support
    • The delivery order of messages with the same priority is undefined, andBoundedStablePriorityMailboxOn the contrary
    • Block or not: If and non-zeromailbox-push-timeout-timeIf used together, isYes, or forNO
    • Bounded or not:Yes
    • Configuration name:akka.dispatch.BoundedPriorityMailbox
  • BoundedStablePriorityMailbox
    • By the packagingakka.util.PriorityQueueStabilizerandakka.util.BoundedBlockingQueueIn thejava.util.PriorityQueueTo provide support
    • Reserved for messages with the same priorityFIFOOrder, andBoundedPriorityMailboxOn the contrary
    • Block or not: If and non-zeromailbox-push-timeout-timeIf used together, isYes, or forNO
    • Bounded or not:Yes
    • Configuration name:akka.dispatch.BoundedStablePriorityMailbox
  • BoundedControlAwareMailbox
    • Pass extends with higher priorityakka.dispatch.ControlMessageThe news of the
    • By twojava.util.concurrent.ConcurrentLinkedQueueSupport, if capacity is reached, blocking during queuing
    • Block or not: If and non-zeromailbox-push-timeout-timeIf used together, isYes, or forNO
    • Bounded or not:Yes
    • Configuration name:akka.dispatch.BoundedControlAwareMailbox

Email Configuration Example

PriorityMailbox

How to create PriorityMailbox

static class MyPrioMailbox extends UnboundedStablePriorityMailbox {
  // needed for reflective instantiation
  public MyPrioMailbox(ActorSystem.Settings settings, Config config) {
    // Create a new PriorityGenerator, lower prio means more important
    super(
        new PriorityGenerator() {
          @Override
          public int gen(Object message) {
            if (message.equals("highpriority"))
              return 0; // 'highpriority messages should be treated first if possible
            else if (message.equals("lowpriority"))
              return 2; // 'lowpriority messages should be treated last if possible
            else if (message.equals(PoisonPill.getInstance()))
              return 3; // PoisonPill when no other left
            else return 1; // By default they go between high and low prio}}); }}Copy the code

Then add it to the configuration:

prio-dispatcher {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other dispatcher configuration goes here
}
Copy the code

Here’s an example of how to use it:

class Demo extends AbstractActor {
  LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  {
    for (Object msg :
        new Object[] {
          "lowpriority"."lowpriority"."highpriority"."pigdog"."pigdog2"."pigdog3"."highpriority", PoisonPill.getInstance() }) { getSelf().tell(msg, getSelf()); }}@Override
  public Receive createReceive(a) {
    returnreceiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); }}// We create a new Actor that just prints out what it processes
ActorRef myActor =
    system.actorOf(Props.create(Demo.class, this).withDispatcher("prio-dispatcher"));

/* Logs: 'highpriority 'highpriority 'pigdog 'pigdog2 'pigdog3 'lowpriority 'lowpriority */
Copy the code

You can also configure the mailbox type directly like this (this is the top-level configuration item) :

prio-mailbox {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other mailbox configuration goes here
}

akka.actor.deployment {
  /priomailboxactor {
    mailbox = prio-mailbox
  }
}
Copy the code

Then use it from a deployment like this:

ActorRef myActor = system.actorOf(Props.create(MyActor.class), "priomailboxactor");
Copy the code

Or something like this:

ActorRef myActor = system.actorOf(Props.create(MyActor.class).withMailbox("prio-mailbox"));
Copy the code

ControlAwareMailbox

ControlAwareMailbox is useful if the Actor needs to receive control messages immediately, no matter how many other messages are already in the mailbox.

It can be configured like this:

control-aware-dispatcher {
  mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
  //Other dispatcher configuration goes here
}
Copy the code

Control messages need to extend the ControlMessage feature:

static class MyControlMessage implements ControlMessage {}
Copy the code

Here’s an example of how to use it:

class Demo extends AbstractActor {
  LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  {
    for (Object msg :
        new Object[] {"foo"."bar".newMyControlMessage(), PoisonPill.getInstance()}) { getSelf().tell(msg, getSelf()); }}@Override
  public Receive createReceive(a) {
    returnreceiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); }}// We create a new Actor that just prints out what it processes
ActorRef myActor =
    system.actorOf(Props.create(Demo.class, this).withDispatcher("control-aware-dispatcher"));

/* Logs: 'MyControlMessage 'foo 'bar */
Copy the code

Create your own mailbox type

The following is an example:

// Marker interface used for mailbox requirements mapping
public interface MyUnboundedMessageQueueSemantics {}
Copy the code
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;
import scala.Option;

public class MyUnboundedMailbox
    implements MailboxType.ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> {

  // This is the MessageQueue implementation
  public static class MyMessageQueue implements MessageQueue.MyUnboundedMessageQueueSemantics {
    private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();

    // these must be implemented; queue used as example
    public void enqueue(ActorRef receiver, Envelope handle) {
      queue.offer(handle);
    }

    public Envelope dequeue(a) {
      return queue.poll();
    }

    public int numberOfMessages(a) {
      return queue.size();
    }

    public boolean hasMessages(a) {
      return! queue.isEmpty(); }public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
      for(Envelope handle : queue) { deadLetters.enqueue(owner, handle); }}}// This constructor signature must exist, it will be called by Akka
  public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
    // put your initialization code here
  }

  // The create method is called to create the MessageQueue
  public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
    return newMyMessageQueue(); }}Copy the code

Then, the FQCN of MailboxType is specified as the value of mailbox-type in the scheduler configuration or mailbox configuration.

  • annotation: Be sure to include an adoptionakka.actor.ActorSystem.Settingsandcom.typesafe.config.ConfigParameter constructor, because this constructor builds the mailbox type through a reflection call. The configuration passed in as the second parameter is the part of the configuration that describes the scheduler or mailbox Settings that use this mailbox type; The mailbox type is instantiated once for each scheduler or mailbox setting that uses it.

You can also use a mailbox as a scheduler requirement, as shown below:

custom-dispatcher {
  mailbox-requirement =
  "jdocs.dispatcher.MyUnboundedMessageQueueSemantics"
}

akka.actor.mailbox.requirements {
  "jdocs.dispatcher.MyUnboundedMessageQueueSemantics" =
  custom-dispatcher-mailbox
}

custom-dispatcher-mailbox {
  mailbox-type = "jdocs.dispatcher.MyUnboundedMailbox"
}
Copy the code

Or define the Actor class requirements like this:

static class MySpecialActor extends AbstractActor
    implements RequiresMessageQueue<MyUnboundedMessageQueueSemantics> {
  // ...
}
Copy the code

Special semantics for system.actorof

Special treatment is made for this case so that System.actorof is both synchronous and non-blocking, while preserving the return type ActorRef (and the semantics of the returned ref being fully functional). Behind the scenes, an empty Actor reference is built and sent to the system’s guardian Actor, which actually creates the Actor and its context and places it in the reference. Until then, messages sent to ActorRef will be queued locally, and only after the real population is exchanged will they be transferred to the real mailbox. As a result,

final Props props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);
assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));
Copy the code

May fail; You must allow some time to pass and retry the testKit.awaitCond check.

Follow public accountData craftsman notes, focus on big data field offline, real-time technology dry goods regular sharing! Personal websitewww.lllpan.top