Introduction to the
Akka’s mailbox holds messages for actors. Typically, each Actor will have its own mailbox, but there are exceptions, such as BalancingPool, where all routers (routees) will share an instance of a mailbox.
The default mailbox
If no mailbox is specified, the default mailbox is used. By default, it is a borderless mailbox, by Java. Util. Concurrent. ConcurrentLinkedQueue support.
SingleConsumerOnlyUnboundedMailbox mailbox is a more efficient, it can be used as the default mailbox, but cannot be used with BalancingDispatcher.
Will SingleConsumerOnlyUnboundedMailbox configuration as the default email address:
akka.actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
Copy the code
Actor specific mailbox
An actor of a particular type can use a mailbox of a particular type, as long as the actor implements the parameterized interface RequiresMessageQueue. Here’s an example:
import akka.dispatch.BoundedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;
public class MyBoundedActor extends MyActor
implements RequiresMessageQueue<BoundedMessageQueueSemantics> {}
Copy the code
RequiresMessageQueue interface type parameters need to be mapped to the configured mailbox, as follows:
bounded-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1000
}
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
Copy the code
Now, every time an Actor of type MyBoundedActor is created, it tries to get a bounded mailbox. This overrides this mapping if the Actor has different mailboxes configured in the deployment, either directly or through a Dispatcher with the specified mailbox type.
- annotation: in order toActorThe queue type in the mailbox created is checked against the required type in the interface, if the queue does not implement the required type, thenactorThe creation will fail.
Dispatcher specific mailboxes
The Dispatcher also needs a mailbox type for running actors. An example is BalancingDispatcher, which requires a concurrent, thread-safe message queue. Such requirements can be planned in the dispenser configuration, as follows:
my-dispatcher {
mailbox-requirement = org.example.MyInterface
}
Copy the code
A given requirement names a class or interface that must be guaranteed to be a supertype of the message queue implementation. In case of a conflict, for example, if an actor needs a mailbox type, but it does not satisfy that requirement, the actor creation will fail.
How do I select an email type
When creating the Actor, the ActorRefProvider first determines that its Dispatcher will execute. Then determine the email type in the following order:
- if
actor
The deployment configuration section contains onemailbox
Keyword, then thismailbox
The keyword specifies the type of mailbox to use; - if
actor
theProps
Include the Mailbox selection – that’s calledwithMailbox
Method – then this method specifies the type of mailbox to use; - If the configuration part of the dispenser contains one
mailbox-type
Keyword, then this section will also be used to configure the mailbox type; - If the actor needs the mailbox type described above, the mapping of this requirement will be used to determine the mailbox type; If that fails, the dispenser’s requirement – if it exists – will be tried;
- If the dispenser requires the mailbox type described below, the mapping of this requirement will be used to determine the mailbox type;
- The default mailbox akka.actor. Default-mailbox will be used.
Which configurations are passed to the Mailbox type
Each MailboxType is implemented by a class that extends MailboxType and takes two constructor arguments: the actorsystem.settings object and the Config section. The latter is obtained from the actor system configuration, overwriting its ID keyword with the mailbox type configuration path, and adding a callback to the default mailbox configuration.
Built-in mailbox implementation
Akka comes with a number of mailbox implementations:
UnboundedMailbox
(the default)- The default mailbox
- by
java.util.concurrent.ConcurrentLinkedQueue
support - Blocked or not:
No
- Bounded or not:
No
- Configuration name:
unbounded
orakka.dispatch.UnboundedMailbox
SingleConsumerOnlyUnboundedMailbox
Depending on your use case, this queue may or may not be faster than the default queue, be sure to benchmark correctly!- Supported by a single consumer queue from multiple manufacturers and cannot be used with BalancingDispatcher
- Blocked or not:
No
- Bounded or not:
No
- Configuration name:
akka.dispatch.SingleConsumerOnlyUnboundedMailbox
NonBlockingBoundedMailbox
- Supported by a very efficient “multi-producer, single-consumer” queue
- Blocked or not:
No
(Discard the overflow message asdeadLetters
) - Bounded or not:
Yes
- Configuration name:
akka.dispatch.NonBlockingBoundedMailbox
UnboundedControlAwareMailbox
- Pass extends with higher priority
akka.dispatch.ControlMessage
The news of the - By two
java.util.concurrent.ConcurrentLinkedQueue
support - Blocked or not:
No
- Bounded or not:
No
- Configuration name:
akka.dispatch.UnboundedControlAwareMailbox
- Pass extends with higher priority
UnboundedPriorityMailbox
- by
java.util.concurrent.PriorityBlockingQueue
support - The delivery order of equal priority messages is undefined, and
UnboundedStablePriorityMailbox
On the contrary - Blocked or not:
No
- Bounded or not:
No
- Configuration name:
akka.dispatch.UnboundedPriorityMailbox
- by
UnboundedStablePriorityMailbox
- By the packaging
akka.util.PriorityQueueStabilizer
In thejava.util.concurrent.PriorityBlockingQueue
To provide support - Reserved for messages with the same priority
FIFO
Order, andUnboundedPriorityMailbox
On the contrary - Blocked or not:
No
- Bounded or not:
No
- Configuration name:
akka.dispatch.UnboundedStablePriorityMailbox
Other bounded mailbox implementations block senders if the capacity is reached and a non-zero mailbox push-timeout-time timeout is configured. In particular, the following mailbox can only be used with zero mailbox-push-timeout-time.
BoundedMailbox
- by
java.util.concurrent.LinkedBlockingQueue
support - Block or not: If and non-zero
mailbox-push-timeout-time
If used together, isYes
, or forNO
- Bounded or not:
Yes
- Configuration name:
bounded
orakka.dispatch.BoundedMailbox
- by
BoundedPriorityMailbox
- By the packaging
akka.util.BoundedBlockingQueue
In thejava.util.PriorityQueue
To provide support - The delivery order of messages with the same priority is undefined, and
BoundedStablePriorityMailbox
On the contrary - Block or not: If and non-zero
mailbox-push-timeout-time
If used together, isYes
, or forNO
- Bounded or not:
Yes
- Configuration name:
akka.dispatch.BoundedPriorityMailbox
- By the packaging
BoundedStablePriorityMailbox
- By the packaging
akka.util.PriorityQueueStabilizer
andakka.util.BoundedBlockingQueue
In thejava.util.PriorityQueue
To provide support - Reserved for messages with the same priority
FIFO
Order, andBoundedPriorityMailbox
On the contrary - Block or not: If and non-zero
mailbox-push-timeout-time
If used together, isYes
, or forNO
- Bounded or not:
Yes
- Configuration name:
akka.dispatch.BoundedStablePriorityMailbox
- By the packaging
BoundedControlAwareMailbox
- Pass extends with higher priority
akka.dispatch.ControlMessage
The news of the - By two
java.util.concurrent.ConcurrentLinkedQueue
Support, if capacity is reached, blocking during queuing - Block or not: If and non-zero
mailbox-push-timeout-time
If used together, isYes
, or forNO
- Bounded or not:
Yes
- Configuration name:
akka.dispatch.BoundedControlAwareMailbox
- Pass extends with higher priority
Email Configuration Example
PriorityMailbox
How to create PriorityMailbox
static class MyPrioMailbox extends UnboundedStablePriorityMailbox {
// needed for reflective instantiation
public MyPrioMailbox(ActorSystem.Settings settings, Config config) {
// Create a new PriorityGenerator, lower prio means more important
super(
new PriorityGenerator() {
@Override
public int gen(Object message) {
if (message.equals("highpriority"))
return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority"))
return 2; // 'lowpriority messages should be treated last if possible
else if (message.equals(PoisonPill.getInstance()))
return 3; // PoisonPill when no other left
else return 1; // By default they go between high and low prio}}); }}Copy the code
Then add it to the configuration:
prio-dispatcher {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other dispatcher configuration goes here
}
Copy the code
Here’s an example of how to use it:
class Demo extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
{
for (Object msg :
new Object[] {
"lowpriority"."lowpriority"."highpriority"."pigdog"."pigdog2"."pigdog3"."highpriority", PoisonPill.getInstance() }) { getSelf().tell(msg, getSelf()); }}@Override
public Receive createReceive(a) {
returnreceiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); }}// We create a new Actor that just prints out what it processes
ActorRef myActor =
system.actorOf(Props.create(Demo.class, this).withDispatcher("prio-dispatcher"));
/* Logs: 'highpriority 'highpriority 'pigdog 'pigdog2 'pigdog3 'lowpriority 'lowpriority */
Copy the code
You can also configure the mailbox type directly like this (this is the top-level configuration item) :
prio-mailbox {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other mailbox configuration goes here
}
akka.actor.deployment {
/priomailboxactor {
mailbox = prio-mailbox
}
}
Copy the code
Then use it from a deployment like this:
ActorRef myActor = system.actorOf(Props.create(MyActor.class), "priomailboxactor");
Copy the code
Or something like this:
ActorRef myActor = system.actorOf(Props.create(MyActor.class).withMailbox("prio-mailbox"));
Copy the code
ControlAwareMailbox
ControlAwareMailbox is useful if the Actor needs to receive control messages immediately, no matter how many other messages are already in the mailbox.
It can be configured like this:
control-aware-dispatcher {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
//Other dispatcher configuration goes here
}
Copy the code
Control messages need to extend the ControlMessage feature:
static class MyControlMessage implements ControlMessage {}
Copy the code
Here’s an example of how to use it:
class Demo extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
{
for (Object msg :
new Object[] {"foo"."bar".newMyControlMessage(), PoisonPill.getInstance()}) { getSelf().tell(msg, getSelf()); }}@Override
public Receive createReceive(a) {
returnreceiveBuilder() .matchAny( message -> { log.info(message.toString()); }) .build(); }}// We create a new Actor that just prints out what it processes
ActorRef myActor =
system.actorOf(Props.create(Demo.class, this).withDispatcher("control-aware-dispatcher"));
/* Logs: 'MyControlMessage 'foo 'bar */
Copy the code
Create your own mailbox type
The following is an example:
// Marker interface used for mailbox requirements mapping
public interface MyUnboundedMessageQueueSemantics {}
Copy the code
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Queue;
import scala.Option;
public class MyUnboundedMailbox
implements MailboxType.ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> {
// This is the MessageQueue implementation
public static class MyMessageQueue implements MessageQueue.MyUnboundedMessageQueueSemantics {
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
// these must be implemented; queue used as example
public void enqueue(ActorRef receiver, Envelope handle) {
queue.offer(handle);
}
public Envelope dequeue(a) {
return queue.poll();
}
public int numberOfMessages(a) {
return queue.size();
}
public boolean hasMessages(a) {
return! queue.isEmpty(); }public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
for(Envelope handle : queue) { deadLetters.enqueue(owner, handle); }}}// This constructor signature must exist, it will be called by Akka
public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
// put your initialization code here
}
// The create method is called to create the MessageQueue
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
return newMyMessageQueue(); }}Copy the code
Then, the FQCN of MailboxType is specified as the value of mailbox-type in the scheduler configuration or mailbox configuration.
- annotation: Be sure to include an adoption
akka.actor.ActorSystem.Settings
andcom.typesafe.config.Config
Parameter constructor, because this constructor builds the mailbox type through a reflection call. The configuration passed in as the second parameter is the part of the configuration that describes the scheduler or mailbox Settings that use this mailbox type; The mailbox type is instantiated once for each scheduler or mailbox setting that uses it.
You can also use a mailbox as a scheduler requirement, as shown below:
custom-dispatcher {
mailbox-requirement =
"jdocs.dispatcher.MyUnboundedMessageQueueSemantics"
}
akka.actor.mailbox.requirements {
"jdocs.dispatcher.MyUnboundedMessageQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "jdocs.dispatcher.MyUnboundedMailbox"
}
Copy the code
Or define the Actor class requirements like this:
static class MySpecialActor extends AbstractActor
implements RequiresMessageQueue<MyUnboundedMessageQueueSemantics> {
// ...
}
Copy the code
Special semantics for system.actorof
Special treatment is made for this case so that System.actorof is both synchronous and non-blocking, while preserving the return type ActorRef (and the semantics of the returned ref being fully functional). Behind the scenes, an empty Actor reference is built and sent to the system’s guardian Actor, which actually creates the Actor and its context and places it in the reference. Until then, messages sent to ActorRef will be queued locally, and only after the real population is exchanged will they be transferred to the real mailbox. As a result,
final Props props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox").tell("bang", sender);
assert(MyCustomMailbox.getInstance().getLastEnqueued().equals("bang"));
Copy the code
May fail; You must allow some time to pass and retry the testKit.awaitCond check.
Follow public accountData craftsman notes
, focus on big data field offline, real-time technology dry goods regular sharing! Personal websitewww.lllpan.top