In Akka, actors are also often in a state where they cannot process certain messages. For example, if the database client is offline, it cannot process any messages until it is brought back online. We can choose to re-establish the client connection until the connection is successful. In this case, the client discards all messages it receives until the connection is successful. Another option is to put aside messages that cannot be processed by the client until the connection is restored.
Akka provides a mechanism called Stash for switching back and forth between message states in Akka:
- Stash: Temporarily stores messages in a separate queue that stores messages that cannot currently be processed:
- Unstash: Pulls the message out of the staging queue and puts it back into the mailbox queue, where the actor can continue processing the message
Note that while stash() and unstash() are handy when you want to change state quickly, the state of the stash message must be bound to a time limit or else it might fill a MailBox. You can schedule execution of this message in the actor constructor or preStart method to avoid mailbox filling problems:
System.scheduler().scheduleOnce(
Duration.create(1000,TimeUnit.MILLISECONDS),self(),CheckConnected,system.dispacher(),null);
Copy the code
Conditional statements
The most intuitive approach is to store the state in the Actor and then use a conditional statement to determine what the Actor should do.
public PartialFunction receive(a) {
return RecieveBuilder
.match(GetRequest.class, x -> {
if (online) {
processMessage(x);
} else {
stash();
}
})
.match(Tcp.Connected.class, x -> {
online = true;
unstash();
)
.match(Disconnected.class, x -> online = false).build(); }}Copy the code
Stash /unstash is used, so once the Actor is online, all of the Stash messages are processed.
Many times, actors store state and then behave differently depending on the value of that state. Using conditional statements is a very procedural way of dealing with behavior and state.
Hotswap: Become/Unbecome
- Become (PartialFunction Behavior) : This method modifies the behavior defined in the Receive block to a new PartialFunction.
- Unbecome () : This method changes the behavior of Actor back to the default behavior.
private PartialFunction<Object, BoxedUnit> disconnected;
private PartialFunction<Object, BoxedUnit> online;
public HotswapClientActor(String dbPath) {
remoteDb = context().actorSelection(dbPath);
/* * The Connected messages are temporarily stored and put aside until they are received. * /
disconnected = ReceiveBuilder.
match(Request.class, x -> { //can't handle until we know remote system is responding
remoteDb.tell(new Connected(), self()); //see if the remote actor is up
stash(); // Cache messages
}).
match(Connected.class, x -> { // Okay to start processing messages.
context().become(online);
unstash(); // Get the message
}).build();
/* * Once the Connected message is received, the Actor calls BECOME, changing the state to online (defined in the online method). * At this point, the Actor also calls unstash to fetch all the temporary messages back to the work queue. This allows all messages to be processed using the behavior defined in the online method. * /
online = ReceiveBuilder.
match(Request.class, x -> {
remoteDb.forward(x, context()); //forward instead of tell to preserve sender
}).
build();
/** * Can define any number of receive blocks and switch with each other */
receive(disconnected); // Initialization state
}
Copy the code
In contrast to conditional statements, the behavior of each state is defined in its own separate PartialFunction.
The Actor starts out offline, unable to respond to GetRequest, so stash the message. These messages are temporarily stored and put aside until Connected messages are received. Once the Connected message is received, the Actor calls BECOME, changing the state to online. At this point, the Actor also calls unstash to fetch all the temporary messages back to the work queue. This allows all messages to be processed using the behavior defined in the online status method. If the Actor receives a Disconnected message, unbecome is called to restore the Actor behavior to its default setting.
Finite State Machine (FSM)
More please refer to the two articles about FSM: www.jianshu.com/p/41905206b… www.jianshu.com/p/c5b0559f4…
There are also statuses and state-based behavior changes in FSM. FSM is a heavier abstraction than hot swapping, requiring more code and types to get up and running. So in general, heat exchange is a simpler, more readable option.
Type in FSM takes two parameters: state and container.
Define state
- Disconnected: Offline, with no messages in the queue;
- Disconnected and Pending: offline, queue containing messages;
- Connected: Indicates that there is no message in the queue.
- Connected and Pending: indicates that the queue is online and contains messages
enum State{
DISCONNECTED,
CONNECTED,
CONNECTED_AND_PENDING,
}
Copy the code
State of the container
The state container is where the messages are stored. FSM allows us to define state containers and modify them when switching states.
public class EventQueue extends LinkedList<Request> {}
-
Inheritance akka. Actor. AbstractFSM > < S, D
public class BunchingAkkademyClient extends AbstractFSM<State.RequestQueue>{{//init block } } Copy the code
-
The initialization block defines the behavior
{ startWith(DISCONNECTED, null); //init block } Copy the code
-
Define how different states respond to different messages and how to switch states based on the messages received.
/* * Defines how different states respond to different messages and how to switch states based on received messages. * the when (S state, PartialFunction pf) * * matchEvent create a [[akka. Japi. Pf. FSMStateFunctionBuilder]] and the first case statement sets. Case statements that match events, data types, and predicates. * * / when(DISCONNECTED, // The Disconnected state stores the message or transitions to the Connected state. It ignores all messages except Connected and GetRequest. matchEvent(FlushMsg.class, (msg, container) -> stay()) .event(Request.class, (msg, container) -> { remoteDb.tell(new Connected(), self()); container.add(msg); return stay(); }) .event(Connected.class, (msg, container) -> { if (container.size() == 0) { return goTo(CONNECTED); } else { returngoTo(CONNECTED_AND_PENDING); }})); when(CONNECTED,// The Connected state is only concerned with messages that transfer the state to a ConnectedAnd Pending state. matchEvent(FlushMsg.class, (msg, container) -> stay()) .event(Request.class, (msg, container) -> { container.add(msg); return goTo(CONNECTED_AND_PENDING); })); when(CONNECTED_AND_PENDING, // The ConnectedAndPending state can either send all the requests in the container or add another request to the container. matchEvent(FlushMsg.class, (msg, container) -> { remoteDb.tell(container, self()); container = new EventQueue(); return goTo(CONNECTED); }) .event(Request.class, (msg, container) -> { container.add(msg); return goTo(CONNECTED_AND_PENDING); })); Copy the code
Some messages are ignored in some states and processed in others. The Actor must return a description of the state that either remains in the FSM or has been moved to another state.
-
initialize()
The last thing to do in this code block is to call Initialize ();
reference
- Introduction and Practice of Akka
Follow public accountData craftsman notes
, focus on big data field offline, real-time technology dry goods regular sharing! replyAkka
Pick up the Akka Primer and Practice book! Personal websitewww.lllpan.top