1. Responsive system design
Akka is described as a responsive platform, and more specifically, part of the Typesafe responsive platform. The responsive manifesto contains four principles that can be described as design goals: sensitivity, scalability, fault tolerance, and event-driven design.
1.1 sensitivity
Applications should respond to requests as quickly as possible. In order to return a response to the user as quickly as possible, given the choice between sequential or parallel data retrieval, parallel data retrieval should always be preferred. If an error is possible, return immediately to notify the user of the problem and do not make the user wait until time out.
1.2 scalability
Applications should be able to scale (especially by adding computing resources) for different workloads. If you are running an in-memory database on a virtual machine, adding another virtual node can spread all query requests across two virtual servers, doubling the possible throughput. Adding additional nodes should improve the performance of the system almost linearly. When you add a node for an in-memory database, you can also split the data in half and move half of it to the new node, doubling the memory capacity. Adding nodes should increase memory capacity almost linearly.
1.3 fault tolerance
If an error occurs in a component of the system, there should be no impact on requests unrelated to that component. Errors are inevitable, so you should limit your impact to the component where the error occurred.
1.4 Event driven/message driven
Using messages instead of making method calls directly provides a way to help us meet the other three reactive criteria. Message-driven systems focus on controlling when, where, and how requests are responded to, allowing routing and load balancing by responding components. Because asynchronous message-driven systems consume resources (such as threads) only when they are really needed, they make more efficient use of system resources. Messages can also be sent to remote machines (location transparent). Because the message to be sent is approved to be sent from and in a message queue outside the Actor, it is possible to make the system that made the error recover itself through a monitoring mechanism.
The four responsive criteria are not completely independent of each other. The approach taken to satisfy one criterion often helps to satisfy the other. For example, if a service is found to be slow to respond, we might stop sending requests to that service for a short time, wait for it to recover, and immediately return an error message to the user. This reduces the risk of a slow-responding service being overwhelmed and crashing directly, and therefore improves the system’s fault tolerance. In addition, we immediately informed the user of the system problems, which improved the system response speed, as shown in the figure:
2 analysis of the Actor
As a simple example, simply build an Actor that receives “Ping” and returns the string “Pong” in response.
package com.lp.akka.notes;
import akka.actor.AbstractActor;
import akka.actor.Status;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
/ * * *@author lipan
*/
public class JavaPongActor extends AbstractActor {
protected final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public PartialFunction receive(a) {
return ReceiveBuilder.matchEquals("Ping", message -> {
sender().tell("Pong", self());
log.info("message:" + message);
log.info("sender:" + sender().path());
log.info("self:" + self());
}).matchAny(other -> {
sender().tell(new Status.Failure(new Exception("unknown message")), self());
log.info("other:"+ other); }).build(); }}Copy the code
-
AbstractActor: This Java 8-specific API takes advantage of the Lamdba feature. UntypedActor can also be inherited as a base class, but this class is older. In the UntypeActor API, you get an object and you have to condition it with an if statement; But the Java8 API can be more expressive through pattern matching.
-
Receive: The AbstractActor class has a Receive method that its subclasses must implement or call in the constructor. The receive method returns a type called PartialFuncation, which is derived from Scala’s API. There are no native methods provided in Java to construct Scala’s PartialFunction, so Akka provides us with an abstract constructor class ReceiveBuilder that produces partialFunctions as return values.
-
ReceiveBuilder: Calls the ReceiveBuilder method consecutively to provide a description of the response method for all message input message types that need to be matched for processing. The build() method is then called to generate the required return value PartialFunction.
-
Match: Matches the message type. The match function matches from top to bottom, so you can define special cases first and general cases last.
match(final Class<? extends P> type, FI.UnitApply<? extends P> apply) Copy the code
Describes the response behavior for any unmatched instances of the type.
match(final Class<P> type, final FI.TypedPredicate<P> predicate,final FI.UnitApply<P> apply) Copy the code
Describes how to respond to a certain type of message for which the predicate condition function is true.
matchAny(final FI.UnitApply<Object> apply) Copy the code
This function matches all unmatched messages, and in general, the best event is either to return an error message or to log an error message.
-
Return message to sender() : After calling the sender() method, you can return the response to the received message. The response object can be either an Actor or a request from outside the Actor system. The first case is fairly straightforward: as shown in the code above, the returned message is sent directly to the Actor’s recipient mailbox.
-
The tell() : Sender () function returns an ActorRef. In sender().tell(), tell() is the most basic single-item message transfer mode. The first argument is the message to be sent to the other Actor’s mailbox, and the second argument is the sender you want the Actor to see. Actorref.nosender () indicates that there is noSender and therefore no return address.
In the current version, some of the AbstractActor methods have been tweaked. The createReceive method must be overridden and return receive instead of PartialFunction. The corresponding ReceiveBuilder for PartialFunction production has also been adjusted. Methods such as match in ReceiveBuilder have been changed from static to non-static. The build method has also been rewritten.
3 create Actor
Access to actors is different from access to ordinary objects. We never get an instance of an Actor, call methods on an Actor, or change the state of the Actor directly. Instead, we just send messages to the Actor. By using message-based mechanisms, actors can be wrapped fairly completely. If you communicate only through messages, you never need to get instances of actors, just a mechanism to send messages to actors and receive responses. – ActorRef
In Akka, this reference to an Actor instance is called ActorRef. An ActorRef is an untyped reference that wraps the Actor it points to, providing a higher level of abstraction and giving the user a mechanism to communicate with the Actor.
ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
Copy the code
The actorOf method generates a new Actor and returns a reference to that Actor.
3.1 Props
To be able to wrap an Actor instance away from direct external access. We pass all the constructor arguments to an instance of Props, which allows us to pass in the Actor type and a list of mutable arguments.
def create(clazz: Class[_], args: AnyRef*) :Props = new Props(deploy = Props.defaultDeploy, clazz = clazz, args = args.toList)
Copy the code
ActorOf creates an Actor and returns a reference to that Actor, ActorRef. In addition, you can use actorSelection to get the ActorRef of an Actor. Each Actor has a path when it is created. You can view the path through actorRef. path, for example:
ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
System.out.println(pingref.path());
Copy the code
Output: akka: / / PongPing/user/pingActor. The path is a URL that can even point to a remote Actor using the Akka.tcp protocol. If you know the path to an Actor, you can use actorSelection to get an actorSelection that points to a reference to that Actor, whether that Actor is local or remote.
3.2 ActorSelection
ActorSelection is also a reference to Actor. Like ActorRef, ActorSeletion can also be used to make actors communicate with each other. This is the best illustration of Akka’s transparency.
ActorRef pingref = system.actorOf(Props.create(JavaPongActor.class), "pingActor");
ActorSelection selection = system.actorSelection(pingref.path());
Copy the code
4 Promise, Future, and event-driven programming model
4.1 blocking IO
Blocking code is familiar to almost every developer. When I do IO, I write blocking code. When we call a synchronous API, the called method does not return immediately: the application waits for the call to complete. For example, if you make an HTTP request, you will receive a response object back only after the request completes. Because the calling thread pauses and waits, any code waiting for the IO operation to complete is blocked, and the calling thread cannot do anything else until the IO operation completes.
When using multiple threads or thread pools to handle blocking I/O, you need to consider that when running multiple threads in the same CPU core, the operating system needs to constantly switch thread contexts to ensure that all threads are allocated CPU time slices. You may encounter the following problems:
- The code does not explicitly indicate an error in the return type;
- The code does not explicitly indicate a delay in the return type;
- The throughput of the blocking model is limited by the size of the thread pool;
- Creating and using many threads can cost extra time for context switching, affecting system performance.
4.2 Event-driven
Event-driven describes: ** When certain events occur, some corresponding code is executed. ** Based on the event-driven model, we need to represent results in different ways in our code. Use a placeholder to indicate the result that will eventually be returned: Future/CompletableFuture.
For Future and CompletableFuture use, see: github.com/perkinls/ja…
4.3 Use and understanding of Future
After the method returns the Future/CompletableFuture, we only get a promise that the real value will eventually be returned to the Future. We don’t want the calling thread to wait for the result to return, but rather to perform a specific action (printing to the console) after the actual result is returned. In an event-driven system, all you need to do is describe the code that needs to be executed when an event occurs. In Actor, describes the action that takes place when a message is received. Similarly, in the Future, we describe what happens when the value of the Future is actually available. In Java 8, thenRun is used to register code that needs to be executed when the event completes successfully; In Scala, onComplete is used
/ / Java version
package lipan.top.notes.java.chapter01;
/ * * *@author li.pan
* @version 1.0.0
* @Description TODO
* @createTime23 December 2020 18:37:00 */
public class PongActorTest {
ActorSystem system = ActorSystem.create();
ActorRef actorRef =system.actorOf(Props.create(JavaPongActor.class), "BruceWillis");
/** * Success status **@throws Exception
*/
@Test
public void shouldReplyToPingWithPong(a) throws Exception {
/* Ask the Actor for its response to a message * param1: reference to the Actor to which the message was sent * param2: message to be sent to the Actor; * Parma3 :Future timeout parameter: how long to wait for results before the query is considered failed. * /
Future sFuture = ask(actorRef, "Ping".1000);
// Convert scala Future to CompletableFuture
final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);
final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
assertEquals("Pong", jFuture.get(1000, TimeUnit.MILLISECONDS));
}
/** * Failed status **@throws Exception
*/
@Test(expected = ExecutionException.class)
public void shouldReplyToUnknownMessageWithFailure(a) throws Exception {
/* Ask the Actor for its response to a message * param1: reference to the Actor to which the message was sent * param2: message to be sent to the Actor; * Parma3 :Future timeout parameter: how long to wait for results before the query is considered failed. * /
Future sFuture = ask(actorRef, "unknown".1000);
// Convert scala Future to CompletableFuture
final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);
final CompletableFuture<String> jFuture = (CompletableFuture<String>) cs;
jFuture.get(1000, TimeUnit.MILLISECONDS);
}
//Future Examples
@Test
public void shouldPrintToConsole(a) throws Exception {
askPong("Ping").thenAccept(x -> System.out.println("replied with: " + x));
Thread.sleep(100);
//no assertion - just prints to console. Try to complete a CompletableFuture instead.
}
@Test
public void shouldTransform(a) throws Exception {
char result = (char) get(askPong("Ping").thenApply(x -> x.charAt(0)));
assertEquals('P', result);
}
/** * There is a bug with the Scala - Java8 - Compat Library 0.3.0 - thenCompose throws exception * https://github.com/scala/scala-java8-compat/issues/26 * < p > * I confirmed fixed in 0.6.0 - the SNAPSHOT (10 up later). Just in time for publishing! * /
@Test
public void shouldTransformAsync(a) throws Exception {
CompletionStage cs = askPong("Ping").
thenCompose(x -> askPong("Ping"));
assertEquals(get(cs), "Pong");
}
@Test
public void shouldEffectOnError(a) throws Exception {
askPong("cause error").handle((x, t) -> {
if(t ! =null) {
System.out.println("Error: " + t);
}
return null;
});
}
@Test
public void shouldRecoverOnError(a) throws Exception {
CompletionStage<String> cs = askPong("cause error").exceptionally(t -> {
return "default";
});
String result = (String) get(cs);
}
// First check if exception is null, if it is, return the Future containing the result, otherwise return the retry Future. ThenCompose is finally called to flatten the nested CompletionStage
@Test
public void shouldRecoverOnErrorAsync(a) throws Exception {
CompletionStage<String> cf = askPong("cause error")
.handle((pong, ex) -> ex == null
? CompletableFuture.completedFuture(pong)
: askPong("Ping") // Retry in case of null
).thenCompose(x -> x);
assertEquals("Pong", get(cf));
}
@Test
public void shouldPrintErrorToConsole(a) throws Exception {
askPong("cause error").handle((x, t) -> {
if(t ! =null) {
System.out.println("Error: " + t);
}
return null;
});
Thread.sleep(100);
}
//Helpers
public Object get(CompletionStage cs) throws Exception {
return ((CompletableFuture<String>) cs).get(1000, TimeUnit.MILLISECONDS);
}
public CompletionStage<String> askPong(String message) {
Future sFuture = ask(actorRef, message, 1000);
final CompletionStage<String> cs = FutureConverters.<Object>toJava(sFuture);
returncs; }}Copy the code
This is asynchronous code. Future or CompletableFuture returns a value of type Object on success and Throwable on failure.
- Execute code on the returned result and execute an event once the result is returned. You can use thenAccept to manipulate the returned result,
- The most common use case for transforming a returned result is to asynchronously transform the response before processing it. ThenApply returns a new Future.
- Sometimes you make an asynchronous call, and when you get the result, you make another asynchronous call, using thenCompose.
- In the case of failure to use the handle, can shouldRecoverOnErrorAsync reference method.
/ / the Scala version
class ScalaAskExamplesTest extends FunSpecLike with Matchers {
val system: ActorSystem = ActorSystem(a)implicit val timeout: Timeout = Timeout(5 seconds)
val pongActor: ActorRef = system.actorOf(Props(classOf[ScalaPongActor]))
describe("Pong actor") {
it("should respond with Pong") {
// Request a response message from the Actor
val future = pongActor ? "Ping"
The return value of /* * Actor is untyped, so we receive Future[AnyRef] as the result. * So call future.mapTo[String] to convert the future's type to the desired result type. * /
val result = Await.result(future.mapTo[String].1 second)
assert(result == "Pong")
}
it("should fail on unknown message") {
val future = pongActor ? "unknown"
intercept[Exception] {
Await.result(future.mapTo[String].1 second)
}
}
}
describe("FutureExamples") {
import scala.concurrent.ExecutionContext.Implicits.global
it("should print to console") {
askPong("Ping").onSuccess({
case x: String => println("replied with: " + x)
})
Thread.sleep(100)
}
it("should transform") {
val f: Future[Char] = askPong("Ping").map(x => x.charAt(0))
val c = Await.result(f, 1 second)
c should equal('P')}/** * Sends "Ping". Gets back "Pong" * Sends "Ping" again when it gets "Pong" */
it("should transform async") {
val f: Future[String] = askPong("Ping").flatMap(x => {
assert(x == "Pong")
askPong("Ping")})val c = Await.result(f, 1 second)
c should equal("Pong")}//doesn't actually test anything - demonstrates an effect. next test shows assertion.
it("should effect on failure") {
askPong("causeError").onFailure {
case e: Exception => println("Got exception")}}/** * similar example to previous test, but w/ assertion */
it("should effect on failure (with assertion)") {
val res = Promise()
askPong("causeError").onFailure {
case e: Exception =>
res.failure(new Exception("failed!"))
}
intercept[Exception] {
Await.result(res.future, 1 second)
}
}
it("should recover on failure") {
val f = askPong("causeError").recover({
case t: Exception= >"default"
})
val result = Await.result(f, 1 second)
result should equal("default")
}
it("should recover on failure async") {
val f = askPong("causeError").recoverWith({
case t: Exception => askPong("Ping")})val result = Await.result(f, 1 second)
result should equal("Pong")
}
it("should chain together multiple operations") {
val f = askPong("Ping").flatMap(x => askPong("Ping" + x)).recover({
case_ :Exception= >"There was an error"
})
val result = Await.result(f, 1 second)
result should equal("There was an error")
}
it("should be handled with for comprehension") {
val f1 = Future {
4
}
val f2 = Future {
5
}
val futureAddition =
for {
res1 <- f1
res2 <- f2
} yield res1 + res2
val additionResult = Await.result(futureAddition, 1 second)
assert(additionResult == 9)
}
it("should handle a list of futures") {
val listOfFutures: List[Future[String]] = List("Pong"."Pong"."failure").map(x => askPong(x))
val futureOfList: Future[List[String]] = Future.sequence(listOfFutures)
}
}
def askPong(message: String) :Future[String] = (pongActor ? message).mapTo[String]}Copy the code
4.4 Chain operation
Each of the above methods returns a new Future that can be combined in a functional style without handling exceptions. We can focus on successful cases and collect errors at the end of the chain operation.
askPong("Ping").thenCompose(x -> askPong("Ping" + x))
.handle((x, t) -> {
if(t ! =null) {
return "default";
} else {
returnx; }});Copy the code
Errors that occur when performing any operation in the chain of operations can be treated as errors that occur at the end of the chain. This creates an efficient pipeline for handling exceptions at the end of whatever operation caused the error. We can focus on describing success without doing extra error checking in the middle of the chain. The error can be handled separately at the end.
5 References
“Akka Introduction and Practice” pay attention to the public account data craftsmen to reply to “Akka” for receiving
Follow public accountData craftsman notes
, focus on big data field offline, real-time technology dry goods regular sharing! Personal websitewww.lllpan.top