In my previous article, I talked about how to use Netty development and implementation of high-performance RPC server design ideas, design principles, and specific implementation scheme (see: talk about how to use Netty development and implementation of high-performance RPC server). At the end of the article, it is mentioned that the processing performance of RPC server designed based on this scheme still has room for optimization. Therefore, on the basis of the original NettyRPC framework, optimization and reconstruction were carried out during the weekend. The main optimization and reconstruction points of this time are as follows:

1. Netty’s own ObjectEncoder and ObjectDecoder (object encoding and decoder) are used to encode and decode RPC messages in NettyRPC, which are based on Java’s native serialization mechanism. According to the existing articles and test data, Java native serialization performance is not high efficiency, and the serialization binary code stream generated is too large, so in this optimization, the concept of RPC message serialization protocol is introduced. The message serialization protocol is specially customized for the serialization and deserialization process of RPC messages, and a third-party codec framework is introduced. The third party codec frameworks introduced in this time include Kryo and Hessian. Once again, the concept of object serialization and deserialization is mentioned here. In RPC remote service calls, message objects need to be transferred over the network, which uses serialization to convert objects into byte streams, reach the other end, and then deserialize back into message objects.

2. Introduce Google Guava concurrent programming framework to repackage NIO thread pool and business thread pool of NettyRPC.

3. When using third-party codification and decoding frameworks (Kryo and Hessian), the frequent creation and destruction of serialized objects in the scenario of high concurrency will consume the MEMORY resources of JVM and affect the processing performance of the entire RPC server. Therefore, Object Pooling technology is introduced. As we all know, creating new objects and initializing them can be time-consuming. When a large number of objects need to be generated, performance may be affected. In order to solve this problem, in addition to increased hardware conditions, object pooling is the silver bullet, but the Apache Commons Pool framework is a good object pooling implementation (open source project path: commons.apache.org/proper/comm…). . The Hessian pooling work in this paper is mainly based on the Apache Commons Pool framework for encapsulation processing.

This paper will focus on the implementation ideas and methods of the NettyRPC server after reconstruction and optimization from the above three aspects. First of all, please take a brief look at the serialization protocol supported by the optimized NettyRPC server, as shown in the figure below:

  

It can be clearly seen that optimized NettyRPC can support Kryo, Hessian, Java local serialization three message serialization methods. The Java local serialization method, I believe you should be very familiar, again will not repeat. Now let’s focus on two other serialization methods:

1. Kryo serialization Kryo is an efficient object serialization framework that is customized for Java. Compared with Java native serialization, Kryo has great improvements in processing performance, stream size, and so on. Many well-known open source projects have introduced this serialization approach. Such as Alibaba open source Dubbo RPC and so on. The default version of Kryo used in this article is based on: Kryo-3.0.3. It can be downloaded at github.com/EsotericSof… . Why this version? The main reason, as I explained above, is that the frequent creation and destruction of serialized objects in high-concurrency scenarios can be very costly to the JVM’s memory resources and time. In this release of Kryo, integration introduces the serialized object Pool functionality module (KryoFactory, KryoPool) so that we don’t have to re-wrap it using the Apache Commons Pool.

2. Hessian serialization. Hessian is a serialization protocol that is faster than Java’s native serialization, deserialization, and smaller amounts of data. It uses binary format for data transmission, and currently supports a variety of language formats. Hessian-4.0.37 is used in this article and can be downloaded at hessian.caucho.com/#Java.

Next, Look at first after the optimization of NettyRPC message protocol codec pack (newlandframework.net ty. RPC. Serialize. Support, newlandframework.net ty. The RPC, serialize. Support. The kry O, newlandframework.net ty. RPC. Serialize. Support. The structure of the hessian), as shown in the figure below:

     

The RPC request message structure code is as follows:

/ * * * @ filename: MessageRequest. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author RPC service request structure Tangjie * @ version 1.0 * * ty/package newlandframework.net. RPC model; import java.io.Serializable; import org.apache.commons.lang.builder.ReflectionToStringBuilder; public class MessageRequest implements Serializable { private String messageId; private String className; private String methodName; private Class<? >[] typeParameters; private Object[] parametersVal; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<? >[] getTypeParameters() { return typeParameters; } public void setTypeParameters(Class<? >[] typeParameters) { this.typeParameters = typeParameters; } public Object[] getParameters() { return parametersVal; } public void setParameters(Object[] parametersVal) { this.parametersVal = parametersVal; } public String toString() { return ReflectionToStringBuilder.toStringExclude(this, new String[]{"typeParameters", "parametersVal"}); }}Copy the code

The RPC reply message structure is as follows:

/ * * * @ filename: MessageResponse. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author RPC service response structure Tangjie * @ version 1.0 * * ty/package newlandframework.net. RPC model; import java.io.Serializable; import org.apache.commons.lang.builder.ReflectionToStringBuilder; public class MessageResponse implements Serializable { private String messageId; private String error; private Object resultDesc; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Object getResult() { return resultDesc; } public void setResult(Object resultDesc) { this.resultDesc = resultDesc; } public String toString() { return ReflectionToStringBuilder.toString(this); }}Copy the code

Now, we come to the above RPC request message, response message for the design of the decoding framework. Due to the protocol type in NettyRPC, Kryo serialization, Hessian serialization, and Java native native serialization are currently supported. Considering scalability, RPC message serialization, protocol type object (RpcSerializeProtocol) is abstracted, and its code implementation is as follows:

/ * * * @ filename: RpcSerializeProtocol. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: RPC message sequence serialization protocol type * @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; Public enum RpcSerializeProtocol {// Since no cross-language RPC communication mechanism has been introduced, Java native serialization, Kryo, Hessian JDKSERIALIZE(" JDknative "), KRYOSERIALIZE("kryo"), HESSIANSERIALIZE("hessian"); private String serializeProtocol; private RpcSerializeProtocol(String serializeProtocol) { this.serializeProtocol = serializeProtocol; } public String toString() { ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE); return ReflectionToStringBuilder.toString(this); } public String getProtocol() { return serializeProtocol; }}Copy the code

Abstract and extract an RPC message serialization/deserialization interface (RpcSerialize) and RPC message codec interface (MessageCodecUtil) according to different serialization frameworks (mainly Kryo and Hessian).

/ * * * @ filename: RpcSerialize. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: RPC message serialization/deserialization interface definition * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public interface RpcSerialize { void serialize(OutputStream output, Object object) throws IOException; Object deserialize(InputStream input) throws IOException; }Copy the code
/ * * * @ filename: MessageCodecUtil. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author RPC messages codec interface Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support; import io.netty.buffer.ByteBuf; import java.io.IOException; Public interface MessageCodecUtil {//RPC header length 4 bytes Final public static int MESSAGE_LENGTH = 4; public void encode(final ByteBuf out, final Object message) throws IOException; public Object decode(byte[] body) throws IOException; }Copy the code

Finally, our NettyRPC framework should be able to control and customize the NettyRPC server and client freely, and adopt which serialization to carry out the network transmission of RPC message objects. Therefore, another RPC Message serialization Protocol selector interface (RpcSerializeFrame) is abstracted. The corresponding implementation is as follows:

/ * * * @ filename: RpcSerializeFrame. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: RPC message sequence interface * serialization protocol selectors @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support; import io.netty.channel.ChannelPipeline; public interface RpcSerializeFrame { public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline); }Copy the code

With the set of interfaces defined above, it is now possible to implement custom RPC message serialization and deserialization modules based on Kryo and Hessian methods. Let’s take a look at the overall class diagram structure:

The first is the RPC MessageEncoder MessageEncoder, which inherits from Netty’s MessageToByteEncoder. It is mainly to encode RPC message objects into binary stream format, corresponding implementation is as follows:

/ * * * @ filename: MessageEncoder. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author RPC message coding interface Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MessageEncoder extends MessageToByteEncoder<Object> { private MessageCodecUtil util = null; public MessageEncoder(final MessageCodecUtil util) { this.util = util; } protected void encode(final ChannelHandlerContext ctx, final Object msg, final ByteBuf out) throws Exception { util.encode(out, msg); }}Copy the code

Next is the RPC MessageDecoder, which inherits from Netty’s bytetommessage decoder. Deserialization is primarily for binary streams into message objects. Of course, IN a previous article I mentioned that NettyRPC is based on TCP protocol, TCP in the process of data transmission will appear the so-called “sticky packet” phenomenon, so our MessageDecoder to verify the length of THE RPC message body, if does not meet the length specified in the RPC message header, Let’s directly reset the position of ByteBuf reading index. Specifically, we can refer to the following code method to parse RPC message protocol:

/ * * * @ filename: MessageDecoder. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author RPC message decoding interface Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.io.IOException; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; public class MessageDecoder extends ByteToMessageDecoder { final public static int MESSAGE_LENGTH = MessageCodecUtil.MESSAGE_LENGTH; private MessageCodecUtil util = null; public MessageDecoder(final MessageCodecUtil util) { this.util = util; } protected void decode(ChannelHandlerContext CTX, ByteBuf in, List<Object> out) { If (in.readableBytes() < messagedecoder.message_length) {return; } in.markReaderIndex(); Int messageLength = in.readint (); if (messageLength < 0) { ctx.close(); } // The length of the message read does not match the known length of the header. If (in.readableBytes() < messageLength) {in.resetreaderIndex (); return; } else { byte[] messageBody = new byte[messageLength]; in.readBytes(messageBody); try { Object obj = util.decode(messageBody); out.add(obj); } catch (IOException ex) { Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE, null, ex); }}}}Copy the code

Now, we further implement the module that uses Kryo serialization method to encode and decode RPC messages. The first is to implement the NettyRPC Message Serialization Interface (RpcSerialize) method.

/ * * * @ filename: KryoSerialize. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Kryo serialization/deserialization * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Kryo; import newlandframework.netty.rpc.serialize.support.RpcSerialize; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.pool.KryoPool; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class KryoSerialize implements RpcSerialize { private KryoPool pool = null; public KryoSerialize(final KryoPool pool) { this.pool = pool; } public void serialize(OutputStream output, Object object) throws IOException { Kryo kryo = pool.borrow(); Output out = new Output(output); kryo.writeClassAndObject(out, object); out.close(); pool.release(kryo); } public Object deserialize(InputStream input) throws IOException { Kryo kryo = pool.borrow(); Input in = new Input(input); Object result = kryo.readClassAndObject(in); in.close(); pool.release(kryo); return result; }}Copy the code

Then using the object pool in Kryo library, the RPC message object is codec. The first is the Kryo Object PoolFactory, which is why I chose the Kryo-3.0.3 version. The code is as follows:

/ * * * @ filename: KryoPoolFactory. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author Kryo object pool factory Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Kryo; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.pool.KryoFactory; import com.esotericsoftware.kryo.pool.KryoPool; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; import org.objenesis.strategy.StdInstantiatorStrategy; public class KryoPoolFactory { private static KryoPoolFactory poolFactory = null; private KryoFactory factory = new KryoFactory() { public Kryo create() { Kryo kryo = new Kryo(); kryo.setReferences(false); Kryo. register(messagerequest.class); kryo.register(MessageResponse.class); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); return kryo; }}; private KryoPool pool = new KryoPool.Builder(factory).build(); private KryoPoolFactory() { } public static KryoPool getKryoPoolInstance() { if (poolFactory == null) { synchronized (KryoPoolFactory.class) { if (poolFactory == null) { poolFactory = new KryoPoolFactory(); } } } return poolFactory.getPool(); } public KryoPool getPool() { return pool; }}Copy the code

Kryo’s tool class KryoCodecUtil for encoding and decoding RPC messages realizes the RPC message codec interface (MessageCodecUtil). The specific implementation method is as follows:

/ * * * @ filename: KryoCodecUtil. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Kryo decoding tools * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Kryo; import com.esotericsoftware.kryo.pool.KryoPool; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import com.google.common.io.Closer; public class KryoCodecUtil implements MessageCodecUtil { private KryoPool pool; private static Closer closer = Closer.create(); public KryoCodecUtil(KryoPool pool) { this.pool = pool; } public void encode(final ByteBuf out, final Object message) throws IOException { try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); closer.register(byteArrayOutputStream); KryoSerialize kryoSerialization = new KryoSerialize(pool); kryoSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); } finally { closer.close(); } } public Object decode(byte[] body) throws IOException { try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body); closer.register(byteArrayInputStream); KryoSerialize kryoSerialization = new KryoSerialize(pool); Object obj = kryoSerialization.deserialize(byteArrayInputStream); return obj; } finally { closer.close(); }}}Copy the code

Finally, Kryo own encoder, decoder, in fact, as long as the call Kryo codecutil tool class (KryoCodecUtil) in the encode, decode method can be. Now post the specific code:

/ * * * @ filename: KryoDecoder. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Kryo decoder * @ author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Kryo; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageDecoder; public class KryoDecoder extends MessageDecoder { public KryoDecoder(MessageCodecUtil util) { super(util); }}Copy the code
/ * * * @ filename: KryoEncoder. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ Kryo encoder author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Kryo; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageEncoder; public class KryoEncoder extends MessageEncoder { public KryoEncoder(MessageCodecUtil util) { super(util); }}Copy the code

Finally, we come to the realization of the use of Hessian RPC message encoding, decoder code module. The first implementation is Hessian Serialization/deserialization, which also implements the RPC message serialization/deserialization interface (RpcSerialize). The corresponding code is as follows:

/ * * * @ filename: HessianSerialize. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Hessian serialization/deserialization * @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Hessian; import com.caucho.hessian.io.Hessian2Input; import com.caucho.hessian.io.Hessian2Output; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import newlandframework.netty.rpc.serialize.support.RpcSerialize; public class HessianSerialize implements RpcSerialize { public void serialize(OutputStream output, Object object) { Hessian2Output ho = new Hessian2Output(output); try { ho.startMessage(); ho.writeObject(object); ho.completeMessage(); ho.close(); output.close(); } catch (IOException e) { e.printStackTrace(); } } public Object deserialize(InputStream input) { Object result = null; try { Hessian2Input hi = new Hessian2Input(input); hi.startMessage(); result = hi.readObject(); hi.completeMessage(); hi.close(); } catch (IOException e) { e.printStackTrace(); } return result; }}Copy the code

To pool Hessian serialized/de-serialized classes using Object Pooling techniques, the following code is used:

/** * @filename:HessianSerializeFactory.java * * Newland Co. Ltd. All rights reserved. * * @description :Hessian serialized/deserialized object factory pool * @author Tangjie * @version 1.0 * */ package newlandframework.netty.rpc.serialize.support.hessian; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; public class HessianSerializeFactory extends BasePooledObjectFactory<HessianSerialize> { public HessianSerialize create() throws Exception { return createHessian(); } public PooledObject<HessianSerialize> wrap(HessianSerialize hessian) { return new DefaultPooledObject<HessianSerialize>(hessian); } private HessianSerialize createHessian() { return new HessianSerialize(); }}Copy the code
/ * * * @ filename: HessianSerializePool. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Hessian serialization/deserialization pool * @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Hessian; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; Public class HessianSerializePool {//Netty implements Hessian serialization/deserialization to avoid the creation of multiple objects and improve JVM memory utilization. When tested // in high concurrent serialization/deserialization scenarios, serialization efficiency was significantly improved. private GenericObjectPool<HessianSerialize> hessianPool; private static HessianSerializePool poolFactory = null; private HessianSerializePool() { hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory()); } public static HessianSerializePool getHessianPoolInstance() { if (poolFactory == null) { synchronized (HessianSerializePool.class) { if (poolFactory == null) { poolFactory = new HessianSerializePool(); } } } return poolFactory; } // Reserved interface, Public HessianSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) { hessianPool = new GenericObjectPool<HessianSerialize>(new HessianSerializeFactory()); GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // Maximum number of pool objects config.setmaxTotal (maxTotal); // The minimum idle number config.setMinIdle(minIdle); Config. setMaxWaitMillis(maxWaitMillis); / / exit connection, the minimum free time 1800000 milliseconds by default config. SetMinEvictableIdleTimeMillis (minEvictableIdleTimeMillis); hessianPool.setConfig(config); } public HessianSerialize borrow() { try { return getHessianPool().borrowObject(); } catch (final Exception ex) { ex.printStackTrace(); return null; } } public void restore(final HessianSerialize object) { getHessianPool().returnObject(object); } public GenericObjectPool<HessianSerialize> getHessianPool() { return hessianPool; }}Copy the code

After the Hessian serialized object is pooled, we use the Hessian codec class to “borrow” the Hessian serialized object. After you borrow the Hessian serialized object, be sure to return it. The implementation of Hessian codec tool class is as follows:

/ * * * @ filename: HessianCodecUtil. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Hessian decoding tools * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Hessian; import com.google.common.io.Closer; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; public class HessianCodecUtil implements MessageCodecUtil { HessianSerializePool pool = HessianSerializePool.getHessianPoolInstance(); private static Closer closer = Closer.create(); public HessianCodecUtil() { } public void encode(final ByteBuf out, final Object message) throws IOException { try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); closer.register(byteArrayOutputStream); HessianSerialize hessianSerialization = pool.borrow(); hessianSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); pool.restore(hessianSerialization); } finally { closer.close(); } } public Object decode(byte[] body) throws IOException { try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body); closer.register(byteArrayInputStream); HessianSerialize hessianSerialization = pool.borrow(); Object object = hessianSerialization.deserialize(byteArrayInputStream); pool.restore(hessianSerialization); return object; } finally { closer.close(); }}}Copy the code

Finally, Hessian’s encoder and decoder reference codes for RPC messages are as follows:

/ * * * @ filename: HessianDecoder. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Hessian decoder * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Hessian; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageDecoder; public class HessianDecoder extends MessageDecoder { public HessianDecoder(MessageCodecUtil util) { super(util); }}Copy the code
/ * * * @ filename: HessianEncoder. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author Hessian encoder Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Serialize. Support. Hessian; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.MessageEncoder; public class HessianEncoder extends MessageEncoder { public HessianEncoder(MessageCodecUtil util) { super(util); }}Copy the code

So far, shown in the NettyRPC Kryo, Hessian serialization protocol module, is designed and implemented, now we are going to put the agreement, the core of the embedded NettyRPC module package (newlandframework.net ty. RPC. Core). The following is only the optimized code, and the other code modules can be referred to my last article: how to use Netty development to achieve high performance RPC server. Ok, let’s see, NettyRPC core module package (newlandframework.net ty. RPC. Core) hierarchy:

     

Let’s take a look at the implementation of the NettyRPC server. The first is that the Rpc server initialization (MessageRecvChannelInitializer) pipe, keep up with a version comparison, mainly introduces the object serialization news (RpcSerializeProtocol), specific implementation code is as follows:

/** * @filename:MessageRecvChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @ the Description: the Rpc server initialization pipe * @ author tangjie * @ version 1.0 * * ty/package newlandframework.net in Rpc. The core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import java.util.Map; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> { private RpcSerializeProtocol protocol; private RpcRecvSerializeFrame frame = null; MessageRecvChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) { this.protocol = protocol; return this; } MessageRecvChannelInitializer(Map<String, Object> handlerMap) { frame = new RpcRecvSerializeFrame(handlerMap); } protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); frame.select(protocol, pipeline); }}Copy the code

In the Rpc Server execution module (MessageRecvExecutor), the default serialization uses Java’s native native serialization mechanism and optimizes the hierarchy of asynchronous calls to the thread pool. The specific code is as follows:

/**
 * @filename:MessageRecvExecutor.java
 *
 * Newland Co. Ltd. All rights reserved.
 *
 * @Description:Rpc服务器执行模块
 * @author tangjie
 * @version 1.0
 *
 */
package newlandframework.netty.rpc.core;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import newlandframework.netty.rpc.model.MessageKeyVal;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {

    private String serverAddress;
    //默认JKD本地序列化协议
    private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;

    private final static String DELIMITER = ":";

    private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();

    private static ListeningExecutorService threadPoolExecutor;

    public MessageRecvExecutor(String serverAddress, String serializeProtocol) {
        this.serverAddress = serverAddress;
        this.serializeProtocol = Enum.valueOf(RpcSerializeProtocol.class, serializeProtocol);
    }

    public static void submit(Callable<Boolean> task, ChannelHandlerContext ctx, MessageRequest request, MessageResponse response) {
        if (threadPoolExecutor == null) {
            synchronized (MessageRecvExecutor.class) {
                if (threadPoolExecutor == null) {
                    threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1));
                }
            }
        }

        ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(task);
        //Netty服务端把计算结果异步返回
        Futures.addCallback(listenableFuture, new FutureCallback<Boolean>() {
            public void onSuccess(Boolean result) {
                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        System.out.println("RPC Server Send message-id respone:" + request.getMessageId());
                    }
                });
            }

            public void onFailure(Throwable t) {
                t.printStackTrace();
            }
        }, threadPoolExecutor);
    }

    public void setApplicationContext(ApplicationContext ctx) throws BeansException {
        try {
            MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));
            Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();

            Set s = rpcServiceObject.entrySet();
            Iterator<Map.Entry<String, Object>> it = s.iterator();
            Map.Entry<String, Object> entry;

            while (it.hasNext()) {
                entry = it.next();
                handlerMap.put(entry.getKey(), entry.getValue());
            }
        } catch (ClassNotFoundException ex) {
            java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void afterPropertiesSet() throws Exception {
        //netty的线程池模型设置成主从线程池模式,这样可以应对高并发请求
        //当然netty还支持单线程、多线程网络IO模型,可以根据业务需求灵活配置
        ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory");

        //方法返回到Java虚拟机的可用的处理器数量
        int parallel = Runtime.getRuntime().availableProcessors() * 2;

        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup(parallel, threadRpcFactory, SelectorProvider.provider());

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new MessageRecvChannelInitializer(handlerMap).buildRpcSerializeProtocol(serializeProtocol))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);

            if (ipAddr.length == 2) {
                String host = ipAddr[0];
                int port = Integer.parseInt(ipAddr[1]);
                ChannelFuture future = bootstrap.bind(host, port).sync();
                System.out.printf("[author tangjie] Netty RPC Server start success!\nip:%s\nport:%d\nprotocol:%s\n\n", host, port, serializeProtocol);
                future.channel().closeFuture().sync();
            } else {
                System.out.printf("[author tangjie] Netty RPC Server start fail!\n");
            }
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}Copy the code

Rpc server message handling (MessageRecvHandler) also follows suit:

/ * * * @ filename: MessageRecvHandler. Java * * Newland Co., Ltd. All rights reserved. * * @ the Description: the Rpc server message processing * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageRecvHandler extends ChannelInboundHandlerAdapter { private final Map<String, Object> handlerMap; public MessageRecvHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageRequest request = (MessageRequest) msg; MessageResponse response = new MessageResponse(); MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap); / / don't block the nio thread, complex business logic to special thread pool MessageRecvExecutor. Submit (recvTask, CTX, request, response); } public void exceptionCaught(ChannelHandlerContext CTX, Throwable cause) {ctx.close(); }}Copy the code

The Rpc server message thread task processing (MessageRecvInitializeTask) to complete the task of more pure, namely, upon the request of the Rpc messages, the message, the final results are obtained by reflection, and the results of Rpc reply message structure. The code is as follows:

/ * * * @ filename: MessageRecvInitializeTask. Java * * Newland Co., Ltd. All rights reserved. * * @ the Description: the Rpc server message thread task processing @ version 1.0 * @ author tangjie * * * ty/package newlandframework.net in RPC. The core; import io.netty.channel.ChannelHandlerContext; import java.util.Map; import java.util.concurrent.Callable; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; import org.apache.commons.lang.reflect.MethodUtils; public class MessageRecvInitializeTask implements Callable<Boolean> { private MessageRequest request = null; private MessageResponse response = null; private Map<String, Object> handlerMap = null; private ChannelHandlerContext ctx = null; public MessageResponse getResponse() { return response; } public MessageRequest getRequest() { return request; } public void setRequest(MessageRequest request) { this.request = request; } MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap) { this.request = request; this.response = response; this.handlerMap = handlerMap; this.ctx = ctx; } public Boolean call() { response.setMessageId(request.getMessageId()); try { Object result = reflect(request); response.setResult(result); return Boolean.TRUE; } catch (Throwable t) { response.setError(t.toString()); t.printStackTrace(); System.err.printf("RPC Server invoke error! \n"); return Boolean.FALSE; } } private Object reflect(MessageRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); return MethodUtils.invokeMethod(serviceBean, methodName, parameters); }}Copy the code

As mentioned earlier, the server side of NettyRPC can choose the specific serialization protocol, which is currently implemented by hard coding. Next, consider dependency injection via the Spring IOC approach. The corresponding code is as follows:

/ * * * @ filename: RpcRecvSerializeFrame. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * RPC server message serialization protocol framework @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.util.Map; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.RpcSerializeFrame; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; import newlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil; import newlandframework.netty.rpc.serialize.support.hessian.HessianDecoder; import newlandframework.netty.rpc.serialize.support.hessian.HessianEncoder; import newlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil; import newlandframework.netty.rpc.serialize.support.kryo.KryoDecoder; import newlandframework.netty.rpc.serialize.support.kryo.KryoEncoder; import newlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory; public class RpcRecvSerializeFrame implements RpcSerializeFrame { private Map<String, Object> handlerMap = null; public RpcRecvSerializeFrame(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; Public void select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) { switch (protocol) { case JDKSERIALIZE: { pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH)); pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH)); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageRecvHandler(handlerMap)); break; } case KRYOSERIALIZE: { KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance()); pipeline.addLast(new KryoEncoder(util)); pipeline.addLast(new KryoDecoder(util)); pipeline.addLast(new MessageRecvHandler(handlerMap)); break; } case HESSIANSERIALIZE: { HessianCodecUtil util = new HessianCodecUtil(); pipeline.addLast(new HessianEncoder(util)); pipeline.addLast(new HessianDecoder(util)); pipeline.addLast(new MessageRecvHandler(handlerMap)); break; }}}}Copy the code

So far, the design and implementation of the server side of NettyRPC has come to an end.

Now continue to implement the client module of NettyRPC. Among them, the Rpc client pipeline initialization (MessageSendChannelInitializer) module, as well as support the choice of concrete message serialization protocol (RpcSerializeProtocol). The code is as follows:

/** * @filename:MessageSendChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @ the Description: the Rpc client pipeline initialization * @ author tangjie * @ version 1.0 * * ty/package newlandframework.net in Rpc. The core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> { private RpcSerializeProtocol protocol; private RpcSendSerializeFrame frame = new RpcSendSerializeFrame(); MessageSendChannelInitializer buildRpcSerializeProtocol(RpcSerializeProtocol protocol) { this.protocol = protocol; return this; } protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); frame.select(protocol, pipeline); }}Copy the code

Rpc client execution module (MessageSendExecutor) code implementation is as follows:

/ * * * @ filename: MessageSendExecutor. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Rpc client executable module * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import com.google.common.reflect.Reflection; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class MessageSendExecutor { private RpcServerLoader loader = RpcServerLoader.getInstance(); public MessageSendExecutor() { } public MessageSendExecutor(String serverAddress, RpcSerializeProtocol serializeProtocol) { loader.load(serverAddress, serializeProtocol); } public void setRpcServerLoader(String serverAddress, RpcSerializeProtocol serializeProtocol) { loader.load(serverAddress, serializeProtocol); } public void stop() { loader.unLoad(); } public static <T> T execute(Class<T> rpcInterface) { return (T) Reflection.newProxy(rpcInterface, new MessageSendProxy<T>()); }}Copy the code

The Rpc client thread task processing (MessageSendInitializeTask), the parameter increases the protocol type (RpcSerializeProtocol), specific code is as follows:

/ * * * @ filename: MessageSendInitializeTask. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Rpc client task processing thread * @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; import java.util.concurrent.Callable; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class MessageSendInitializeTask implements Callable<Boolean> { private EventLoopGroup eventLoopGroup = null; private InetSocketAddress serverAddress = null; private RpcSerializeProtocol protocol; MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcSerializeProtocol protocol) { this.eventLoopGroup = eventLoopGroup; this.serverAddress = serverAddress; this.protocol = protocol; } public Boolean call() { Bootstrap b = new Bootstrap(); b.group(eventLoopGroup) .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true); b.handler(new MessageSendChannelInitializer().buildRpcSerializeProtocol(protocol)); ChannelFuture channelFuture = b.connect(serverAddress); channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(final ChannelFuture channelFuture)  throws Exception { if (channelFuture.isSuccess()) { MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class); RpcServerLoader.getInstance().setMessageSendHandler(handler); }}}); return Boolean.TRUE; }}Copy the code

The implementation mode of Rpc client MessageSendProxy is adjusted and reconstructed as follows:

/ * * * @ filename: MessageSendProxy. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Rpc client message processing * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import java.lang.reflect.Method; import java.util.UUID; import newlandframework.netty.rpc.model.MessageRequest; import com.google.common.reflect.AbstractInvocationHandler; public class MessageSendProxy<T> extends AbstractInvocationHandler { public Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable { MessageRequest request = new MessageRequest(); request.setMessageId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setTypeParameters(method.getParameterTypes()); request.setParameters(args); MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler(); MessageCallBack callBack = handler.sendRequest(request); return callBack.start(); }}Copy the code

Similarly, the NettyRPC client can also select the protocol type. It must be noted that the protocol type of the NettyRPC client and server must be the same to communicate with each other. The client-side message serialization protocol framework code of NettyRPC is implemented as follows:

/ * * * @ filename: RpcSendSerializeFrame. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * RPC client message serialization protocol framework @ the author tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import newlandframework.netty.rpc.serialize.support.MessageCodecUtil; import newlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil; import newlandframework.netty.rpc.serialize.support.hessian.HessianDecoder; import newlandframework.netty.rpc.serialize.support.hessian.HessianEncoder; import newlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil; import newlandframework.netty.rpc.serialize.support.kryo.KryoDecoder; import newlandframework.netty.rpc.serialize.support.kryo.KryoEncoder; import newlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory; import newlandframework.netty.rpc.serialize.support.RpcSerializeFrame; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; Public class RpcSendSerializeFrame implements RpcSerializeFrame {// It can be optimized to inject public void through Spring IOC select(RpcSerializeProtocol protocol, ChannelPipeline pipeline) { switch (protocol) { case JDKSERIALIZE: { pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageCodecUtil.MESSAGE_LENGTH, 0, MessageCodecUtil.MESSAGE_LENGTH)); pipeline.addLast(new LengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH)); pipeline.addLast(new ObjectEncoder()); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageSendHandler()); break; } case KRYOSERIALIZE: { KryoCodecUtil util = new KryoCodecUtil(KryoPoolFactory.getKryoPoolInstance()); pipeline.addLast(new KryoEncoder(util)); pipeline.addLast(new KryoDecoder(util)); pipeline.addLast(new MessageSendHandler()); break; } case HESSIANSERIALIZE: { HessianCodecUtil util = new HessianCodecUtil(); pipeline.addLast(new HessianEncoder(util)); pipeline.addLast(new HessianDecoder(util)); pipeline.addLast(new MessageSendHandler()); break; }}}}Copy the code

Finally, the NettyRPC client needs to load some Context information on the NettyRPC server. Therefore, the code refactoring for RPC Server loader is as follows:

/ * * * @ filename: RpcServerLoader. Java * * Newland Co., Ltd. All rights reserved. * * @ the Description: the RPC server configuration load * @ the author Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. The core; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class RpcServerLoader { private volatile static RpcServerLoader rpcServerLoader; private final static String DELIMITER = ":"; / / the default RPC messages using Java native serialization protocol transmission private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol. JDKSERIALIZE; Private final static int PARALLEL = Runtime.getruntime ().availableProcessors() * 2; Private EventLoopGroup EventLoopGroup = new NioEventLoopGroup(parallel); private static ListeningExecutorService threadPoolExecutor = MoreExecutors.listeningDecorator((ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1)); private MessageSendHandler messageSendHandler = null; Private Lock Lock = new ReentrantLock(); private Condition connectStatus = lock.newCondition(); private Condition handlerStatus = lock.newCondition(); Private RpcServerLoader() {} public static RpcServerLoader getInstance() {if (RpcServerLoader == null) { synchronized (RpcServerLoader.class) { if (rpcServerLoader == null) { rpcServerLoader = new RpcServerLoader(); } } } return rpcServerLoader; } public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) { String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER); if (ipAddr.length == 2) { String host = ipAddr[0]; int port = Integer.parseInt(ipAddr[1]); final InetSocketAddress remoteAddr = new InetSocketAddress(host, port); ListenableFuture<Boolean> listenableFuture = threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, serializeProtocol)); Futures. AddCallback (listenableFuture) {// Call all RPC threads Futures. new FutureCallback<Boolean>() { public void onSuccess(Boolean result) { try { lock.lock(); if (messageSendHandler == null) { handlerStatus.await(); If (result == boolea.true && messageSendHandler! = null) { connectStatus.signalAll(); } } catch (InterruptedException ex) { Logger.getLogger(RpcServerLoader.class.getName()).log(Level.SEVERE, null, ex); } finally { lock.unlock(); } } public void onFailure(Throwable t) { t.printStackTrace(); } }, threadPoolExecutor); } } public void setMessageSendHandler(MessageSendHandler messageInHandler) { try { lock.lock(); this.messageSendHandler = messageInHandler; handlerStatus.signal(); } finally { lock.unlock(); } } public MessageSendHandler getMessageSendHandler() throws InterruptedException { try { lock.lock(); If (messageSendHandler == null) {connectStatus.await(); } return messageSendHandler; } finally { lock.unlock(); } } public void unLoad() { messageSendHandler.close(); threadPoolExecutor.shutdown(); eventLoopGroup.shutdownGracefully(); } public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) { this.serializeProtocol = serializeProtocol;  }}Copy the code

So far, the code for the main core modules of NettyRPC is all presented. What is the performance of the NettyRPC server after the improvement and reconstruction? Again, practice is the sole criterion for testing truth. Now, let’s start three NettyRPC servers for validation. For details about server parameters, see the following:

1. Java native and local serialized NettyRPC server, corresponding IP: 127.0.0.1:18887.

2. Kryo serializes the NettyRPC server with the corresponding IP address 127.0.0.1:18888.

3. Hessian serializes the NettyRPC server with the IP address 127.0.0.1:18889.

The specific Spring configuration file structure is as follows:

 

Set parameters as follows:

rpc-server-jdknative.properties

# RPC server's IP address config rpc.server.addr=127.0.0.1:18887Copy the code

rpc-server-kryo.properties

# RPC server's IP address config rpc.server.addr=127.0.0.1:18888Copy the code

rpc-server-hessian.properties

# RPC server's IP address config rpc.server.addr=127.0.0.1:18889Copy the code

rpc-invoke-config-jdknative.xml

<? The XML version = "1.0" encoding = "utf-8"? > <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="newlandframework.netty.rpc.core"/> <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server-jdknative.properties"/> <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal"> <property name="messageKeyVal"> <map> <entry key="newlandframework.netty.rpc.servicebean.Calculate"> <ref bean="calc"/> </entry> </map> </property> </bean> <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/> <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor"> <constructor-arg name="serverAddress" value="${rpc.server.addr}"/> <constructor-arg name="serializeProtocol" value="JDKSERIALIZE"/> </bean> </beans>Copy the code

rpc-invoke-config-kryo.xml

<? The XML version = "1.0" encoding = "utf-8"? > <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="newlandframework.netty.rpc.core"/> <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server-kryo.properties"/> <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal"> <property name="messageKeyVal"> <map> <entry key="newlandframework.netty.rpc.servicebean.Calculate"> <ref bean="calc"/> </entry> </map> </property> </bean> <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/> <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor"> <constructor-arg name="serverAddress" value="${rpc.server.addr}"/> <constructor-arg name="serializeProtocol" value="KRYOSERIALIZE"/> </bean> </beans>Copy the code

rpc-invoke-config-hessian.xml

<? The XML version = "1.0" encoding = "utf-8"? > <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="newlandframework.netty.rpc.core"/> <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server-hessian.properties"/> <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal"> <property name="messageKeyVal"> <map> <entry key="newlandframework.netty.rpc.servicebean.Calculate"> <ref bean="calc"/> </entry> </map> </property> </bean> <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/> <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor"> <constructor-arg name="serverAddress" value="${rpc.server.addr}"/> <constructor-arg name="serializeProtocol" value="HESSIANSERIALIZE"/> </bean> </beans>Copy the code

The NettRPC server boot mode is as follows:

new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-jdknative.xml");

new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-kryo.xml");

new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-hessian.xml");Copy the code

If all goes well, the startup information of the NettyRPC server that supports Java native serialization, Kryo serialization, and Hessian serialization will be printed on the console, as shown in the screenshot below:

Java native serialization NettyRPC startup success screenshot:

     

Kryo serialization NettyRPC startup success screenshot:

     

Hessian serialization NettyRPC

     

Now, as in the concurrency test case I used in my last article, I design and construct a sum of 10 RPC requests with instantaneous parallelism, and then observe the elapsed time (ms) of encoding and decoding RPC messages for each specific protocol (Java native serialization, Kryo, Hessian).

The test code is shown below:

/ * * * @ filename: RpcParallelTest. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author RPC concurrent test code Tangjie * @ version 1.0 * * ty/package newlandframework.net in RPC. Servicebean; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import newlandframework.netty.rpc.core.MessageSendExecutor; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; import org.apache.commons.lang.time.StopWatch; public class RpcParallelTest { public static void parallelTask(MessageSendExecutor executor, int parallel, String serverAddress, RpcSerializeProtocol protocol) throws InterruptedException {// Start StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index); new Thread(client).start(); } //10000 concurrent threads immediately initiate the request operation signal.countdown (); finish.await(); sw.stop(); String tip = string. format("[%s] Total RPC call time: [%s] ms ", protocol, sw.getTime()); System.out.println(tip); } public static void JdkNativeParallelTask(MessageSendExecutor executor, Int parallel) throws InterruptedException {String serverAddress = "127.0.0.1:18887"; RpcSerializeProtocol protocol = RpcSerializeProtocol.JDKSERIALIZE; executor.setRpcServerLoader(serverAddress, protocol); RpcParallelTest.parallelTask(executor, parallel, serverAddress, protocol); TimeUnit.SECONDS.sleep(3); } //Kryo serialization protocol public static void KryoParallelTask(MessageSendExecutor executor, Int PARALLEL) throws InterruptedException {String serverAddress = "127.0.0.1:18888"; RpcSerializeProtocol protocol = RpcSerializeProtocol.KRYOSERIALIZE; executor.setRpcServerLoader(serverAddress, protocol); RpcParallelTest.parallelTask(executor, parallel, serverAddress, protocol); TimeUnit.SECONDS.sleep(3); } //Hessian serialization protocol public static void HessianParallelTask(MessageSendExecutor executor, Int parallel) throws InterruptedException {String serverAddress = "127.0.0.1:18889"; RpcSerializeProtocol protocol = RpcSerializeProtocol.HESSIANSERIALIZE; executor.setRpcServerLoader(serverAddress, protocol); RpcParallelTest.parallelTask(executor, parallel, serverAddress, protocol); TimeUnit.SECONDS.sleep(3); } public static void main(String[] args) throws Exception {// parallel 10000 int parallel = 10000; MessageSendExecutor executor = new MessageSendExecutor(); for (int i = 0; i < 10; i++) { JdkNativeParallelTask(executor, parallel); KryoParallelTask(executor, parallel); HessianParallelTask(executor, parallel); System.out.printf("[author tangjie] Netty RPC Server message serialization round [%d] concurrent verification end! \n\n", i); } executor.stop(); }}Copy the code

The operation screenshot is as follows:

Now, I will collect and summarize the test data, analyze and compare the performance of each protocol for RPC message serialization/deserialization (note: due to the configuration difference of each computer, the following test conclusions may differ, the test results are only for learning and communication!). .

After 10 rounds of pressure test, the specific data are as follows:

 

It is obvious that after the above code framework optimization and adjustment, the processing performance of Java native serialization has been greatly improved compared to the processing performance designed and implemented in the previous blog post (RPC message serialization/de-sequence takes less time). Java native serialization, Kryo serialization, and Hessian serialization each had one operation that took more than 10S (seconds) in the 10 stress tests. After statistical analysis, the results are shown as follows:

     

The performance of Kryo serialization and Hessian serialization is comparable and generally superior to that of Java native serialization.

Take a look at the time fluctuation of Java local serialization, Kryo serialization and Hessian serialization in 10 rounds of stress tests, as shown in the figure below:

    

It can be clearly found that the three serialization methods have an “inflection point” respectively. Apart from this “inflection point”, the three serialization methods are relatively stable in terms of time consumption. But in general, the serialization time of Kryo and Hessian fluctuates appropriately and the oscillation is relatively obvious. Java native serialization takes a relatively stable time without frequent shocks, but it takes a long time.

Write at the end: This paper is a performance optimization part of the previous article “How to Use Netty to develop and implement high-performance RPC Server”. It mainly optimizes and reconstructs the NETty-based RPC server framework designed and implemented previously from the perspectives of RPC message serialization mechanism, Object Pooling and multi-thread optimization. Of course, the current RPC server is only in the state of “independent”, can the cluster of several RPC servers through some mechanism to carry out unified distributed coordination management, and service scheduling? The answer is yes. A feasible solution is to introduce Zookeeper for service governance. There is time later, I will continue to optimize and improve, then in the form of blog, present to you! Due to the limitations of my cognitive level and technical ability, the technical views, test data and test conclusions involved in this paper are only used for the learning and communication of friends in the blog garden. If I have said wrong place, welcome friends of the garden criticism and correction!

Write so much, thank you for your patience to read. I believe that after reading this article, you will have more understanding and confidence in using Java to develop high performance server applications. The way ahead is so long without ending, yet high and low I’ll search with my will unbending. There is no end to the path of software knowledge study and exploration, and I would like to share this with you!

PS: Since I published two articles on the development of high-performance RPC server based on Netty in the blog garden, I have received many requests from garden friends for source code to learn and exchange. In order to facilitate everyone, I put the NettyRPC code open source hosting to Github above, welcome interested friends to study together, research!

Attached is the download path of the NettyRPC project: github.com/tang-jie/Ne…