Akka is a natural tool for building distributed applications. Of course, Akka Remote is provided for distributed components, so let’s take a look at how to use Akka Remote and Akka Serialization to build distributed applications.

background

Many students in the program of development will face a problem, when the needs of the business is becoming more and more complex, the stand-alone server has been enough to carry the corresponding request, we will consider the service deployment to a different server, the server may need to call each other, so the system must have mutual communication interface, used for the corresponding data interaction, At this time a good remote call scheme is an absolute weapon, mainstream remote communication has the following options:

  • Remote Procedure Call Protocol (RPC)
  • Web Service
  • Java Messaging Service (JMS)

These methods are widely used communication schemes. If you are interested, you can have a look. Here I will talk about RPC in Java, namely RMI (Remote Method Invocation) and JMS.

JAVA remote Call

RMI and JMS I believe that many students who have written Java programs know that Java programs are used for remote communication, so RMI and JMS and what is the difference?

1.RMI

I. features:
  • Synchronous communication: When a remote method is called using RMI, the thread waits until the result is returned, so it is a synchronous blocking operation;
  • Strong coupling: The RMI service that needs to be used in the requesting system for interface declaration, with certain constraints on the data type returned;
Ii. The advantages:
  • The implementation is relatively simple, the method call form is easy to understand, and the interface declaration service function is clear.
Iii. Disadvantages:
  • Support only for JVM platforms;
  • It does not work with other languages that are not compatible with the Java language;

2.JMS

I. features:
  • Asynchronous communication: JMS sends messages to communicate. During communication, threads are not blocked and do not have to wait for a response to a request, so it is an asynchronous operation.
  • Loose coupling: No interface declaration is required, and the returned data types can be as diverse as JSON, XML, etc.
Ii. Method of Communication:

(1) Point-to-point messaging model

As the name implies, point-to-point can be understood as fixed-point communication between two servers, where both sender and receiver can clearly know who the other is. The general model is as follows:

jms-point-to-point

(2) Publish/subscribe messaging model

Point-to-point model some of the scenes is not applicable, such as a master server, it produces a message need for all from the server to receive, if using point-to-point model, the primary server need to send the message, the subsequent if there is new from the server to increase, to change the main server configuration, this will cause unnecessary trouble, So what does the publish/subscribe model look like? In fact, this pattern is very similar to the observer pattern in the design pattern, and I believe many students are familiar with it. Its biggest characteristic is loose coupling and easy to expand, so the general structure of the publish/subscribe model is as follows:

jms-topic

Iii. Advantages:
  • Because asynchronous communication is used, there is no need for the thread to pause and wait, and the performance is relatively high.
Iiii. Disadvantages:
  • The technical implementation is relatively complex and requires the maintenance of relevant message queues.

In more general terms:

RMI can be thought of as communication over the phone, whereas JMS is more like texting.

In general, neither approach is better than the other, and we don’t have to compare one approach to the other. It makes sense to exist, and more importantly, which option might be better for your system.

RMI Example

Here I write an example of RMI, on the one hand to see how it is used, and on the other hand to make some comparisons with the subsequent Akka Remote:

First, we write the corresponding transmission object and communication interface:

1. JoinRmiEvt:

public class JoinRmiEvt implements Remote , Serializable{
    private static final long serialVersionUID = 1L;
    private Long id;
    private String name;

    public JoinRmiEvt(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId(a) {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName(a) {
        return name;
    }

    public void setName(String name) {
        this.name = name; }}Copy the code

2.RemoteRmi:

public interface RemoteRmi extends Remote {
    public void sendNoReturn(String message) throws RemoteException, InterruptedException;
    public String sendHasReturn(JoinRmiEvt joinRmiEvt) throws RemoteException;
}Copy the code

The interface is then implemented on the server side:

3.RemoteRmiImpl:

public class RemoteRmiImpl extends UnicastRemoteObject implements RemoteRmi {

    private static final long serialVersionUID = 1L;

    public  RemoteRmiImpl(a) throws RemoteException {};

    @Override
    public void sendNoReturn(String message) throws RemoteException, InterruptedException {
        Thread.sleep(2000);
        //throw new RemoteException(); 
    }

    @Override
    public String sendHasReturn(JoinRmiEvt joinRmiEvt) throws RemoteException {
      if (joinRmiEvt.getId() >= 0)
          return new StringBuilder("the").append(joinRmiEvt.getName()).append("has join").toString();
      else return null; }}Copy the code

Next we bind the corresponding port on the Server side and publish the service, then start:

public class RemoteRMIServer {
    public static void main(String[] args) throws RemoteException, AlreadyBoundException, MalformedURLException, InterruptedException {
        System.out.println("the RemoteRMIServer is Starting ...");
        RemoteRmiImpl remoteRmi = new RemoteRmiImpl();
        System.out.println("Binding server implementation to registry");
        LocateRegistry.createRegistry(2553);
        Naming.bind("The rmi: / / 127.0.0.1:2553 / remote_rmi",remoteRmi);
        System.out.println("the RemoteRMIServer is Started");
        Thread.sleep(10000000); }}Copy the code

Here we call the Server on the Client side:

public class RemoteRmiClient {
    public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException, InterruptedException {
        System.out.println("the client has started");
        String url = "The rmi: / / 127.0.0.1:2553 / remote_rmi";
        RemoteRmi remoteRmi = (RemoteRmi) Naming.lookup(url);
        System.out.println("the client has running");
        remoteRmi.sendNoReturn("send no return");
        System.out.println(remoteRmi.sendHasReturn(new JoinRmiEvt(1L."godpan")));
        System.out.println("the client has end"); }}Copy the code

Running results:

java-rmi-result

From the analysis of running results and codes, it can be concluded that:

  • Java Rmi calls are a blocking process, which causes a problem. If the service on the server crashes, the client will not respond.
  • Java Rmi uses Java’s default serialization method, which does not perform very well and does not provide an interface to support the use of other serialization, which can be a bottleneck in some performance demanding systems.
  • The corresponding interfaces and objects used in Rmi must implement the corresponding interfaces and must be specified to throw corresponding exceptions, resulting in cumbersome code that looks like exceptions.

Akka Remote

We talked about remote communication in JAVA, but we said that Akka is also based on the JVM platform, so how does it communicate differently?

In my opinion, Akka’s method of telecommuting is more like a combination of RMI and JMS, but more like JMS. Why? Let’s take a look at an example:

Let’s start by creating a remote Actor:

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"}}Copy the code

Now let’s launch this Actor on the remote server:

val system = ActorSystem("RemoteDemoSystem")
val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")Copy the code

Now what if we have a system that needs to send messages to this Actor?

First of all, we need to provide an interface for other system call remote actors to communicate messages, just like RMI publishing its own service. In Akka, the setup is very simple, no code intrusion, just a simple configuration in the configuration file:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = $localIpFor example, 127.0.0.1 port =$port// 2552}log-sent-messages = on
    log-received-messages = on
  }
}Copy the code

We only need to configure the corresponding driver, transmission mode, IP, port and other attributes to complete the configuration of Akka Remote.

Of course, the local server also needs to configure these information, because akKas need to communicate with each other. Of course, the configuration information can be consistent except the hostname. This example is on the same machine, so the hostname is the same here.

Now we can send messages to this Actor from the local server. First we can create a local Actor:

case object Init
case object SendNoReturn

class LocalActor extends Actor{

  val path = ConfigFactory.defaultApplication().getString("remote.actor.name.test")
  implicit val timeout = Timeout(4.seconds)
  val remoteActor = context.actorSelection(path)

  def receive: Receive = {
    case Init= >"init local actor"
    case SendNoReturn => remoteActor ! "hello remote actor"}}Copy the code

The value of remote.actor.name.test is: ‘akka. TCP: / / [email protected]:4444 / user/RemoteActor “, In addition, we can see that we use context.actorSelection(path) to get an actorSelection object. If we need to get an ActorRef, We can call it resolveOne(), which returns a Future[ActorRef]. This is familiar because it is the same as getting local actors in Akka.

Finally, we start the remote Actor system first:

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
}Copy the code

We then launch the LocalActor on our local system and send it a message:

object LocalDemo extends App {

  implicit val system = ActorSystem("LocalDemoSystem")
  val localActor = system.actorOf(Props[LocalActor], name = "LocalActor")

  localActor ! Init
  localActor ! SendNoReturn
}Copy the code

We can see that RemoteActor receives a message:

send-no-return

From the above steps and results, we can see that Akka’s remote communication is more similar to JMS’s point-to-point mode, but it does not require us to maintain message queues. Instead, it uses the Actor’s own mailbox. In addition, we use the ActorRef obtained by context.actorSelection. It can be regarded as a copy of remote Actor, which is similar to RMI related concepts, so the form of Akka remote communication is like the combination of RMI and JMS, of course, the bottom layer is still through TCP, UDP and other related network protocols for data transmission, as can be seen from the corresponding content of the configuration file.

The example above demonstrates the sendNoReturn model, so what if we need a remote Actor to give us a reply?

First we create a message:

case object SendHasReturn

 def receive: Receive = {
    case SendHasReturn= >for {
        r <- remoteActor.ask("hello remote actor")}yield r
  }Copy the code

Let’s rerun LocalActor and send a message like RemoteActor:

send-has-return

You can see that LocalActor sends the message and receives the message back from RemoteActor. In addition, we have set a timeout time. If we do not get feedback within the specified time, the application will report an error.

Akka Serialization

In fact, this section could have been written separately, but I’m sure you already know the serialization part, so I’m not going to talk too much about serialization, so I’m going to focus on serialization in Akka.

Continuing with the example above, if we are sending a custom object to RemoteActor, such as a Case class object, but we are sending the message over the network, how can we guarantee the object type and value? In the same JVM system we don’t need to worry about this because the object is in the heap. We just passed the corresponding address, but in different environments, we cannot do this, we can only transfer bytes of data in the network, so we must do special treatment, the object at the time of transmission into specific consists of a series of bytes of data, and we can according to these data into a corresponding object, This is serialization.

Let’s define a participating case class and modify the above statement that sends the message:

case object SendSerialization
case class JoinEvt(
    id: Long,
    name: String
)
def receive: Receive = {
    case SendSerialization =>
      for {
        r <- remoteActor.ask(JoinEvt(1L,"godpan"))
      } yield println(r)
  }Copy the code

At this point we restart the system where RemoteActor and LocalActor are located and send this message:

send-serialization

Some students may feel strange, we clearly did not serialize any identification and processing of JoinEvt, why the program can run successfully?

The default serialization strategy was provided by someone, needless to say, the thoughtful Akka. The default serialization strategy was java.io.Serializable, which we were familiar with and obsessed with, but hated for its usability. Especially if you have a distributed system with a lot of objects to transfer, if you have a small system, never mind, it makes sense to exist.

Since Akka is a naturally distributed component, why is it using inefficient Java.io.Serializable? I don’t know. Java. IO.Serializable can be deprecated by Akka kernel messages after Akka 2.5x. Since it is inefficient and vulnerable, I also recommend that you avoid using Java.io.Serializable in Akka.

So how do we use third-party serialization tools in Akka?

Here I recommend kryo, a serialization tool that has long been well known in the Java community. If you are interested, you can check it out: Kryo, which also provides a package for Akka. Here we use it as an example:

Here I posted the build.sbt for the entire project, with kryo dependencies in it:


import sbt._
import sbt.Keys. _lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka"% %"akka-actor" % "2.5.3"."com.typesafe.akka"% %"akka-remote" % "2.5.3"."com.twitter"% %"chill-akka" % "0.8.4"
  )

lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.11",
  libraryDependencies := AllLibraryDependencies
)

lazy val remote = (project in file("remote"))
  .settings(commonSettings: _*)
  .settings(
    // other settings
  )

lazy val local = (project in file("local"))
  .settings(commonSettings: _*)
  .settings(
    // other settings
  )Copy the code

Then we simply replace the actor configuration in application.conf with the following:

actor {
    provider = "akka.remote.RemoteActorRefProvider"
    serializers {
      kryo = "com.twitter.chill.akka.AkkaSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = none
      "scala.Product" = kryo
    }
  }Copy the code

The “java.io.Serializable” = none can be omitted because the default java.io.Serializable policy will be replaced by another serialized policy, just for more detail.

Now we can use Kryo, the whole process is not easy, can’t wait to start writing the demo, so let’s get started.

From the analysis of running results and codes, it can be concluded that:

  • Akka Remote uses built-in serialization tools and supports configuration of specified serialization methods, which can be configured on demand.
  • Akka Remote uses an asynchronous, non-blocking process that minimizes the client’s dependence on the server.
  • Akka Remote code implementation is much simpler than Java Rmi implementation, very simple;

The source code for the entire example has been uploaded to Akka-demo: source link