Since the garden, published two articles on how to build RPC server based on Netty: Talk about how to use Netty development to achieve high-performance RPC server, Netty to achieve high-performance RPC server optimization of the message serialization, received a lot of peers, garden friends enthusiastic feedback and several optimization suggestions, so use leisure time, planning to the original NettyRPC unreasonable module for reconstruction, Some features have been enhanced. The main optimization points are as follows:
- Protostuff has been added to codecs: JDK native object serialization, Kryo, and Hessian.
- Optimized the thread pool model of NettyRPC server to support LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue, and extended multiple thread pool task processing strategies.
- RPC service startup, registration, and uninstallation support are centrally managed through Spring’s custom NettyRPC tags.
Now focus on refactoring ideas, experience, record. Corresponding source code code, you can view my open source github: github.com/tang-jie/Ne… The NettyRPC 2.0 directory in the project.
In the original NettyRPC message codec plug-in, I used: JDK native object serialization (ObjectOutputStream/ObjectInputStream), Kryo, Hessian these three ways, the subsequent have a friend to my proposal, can introduce Protostuff serialized way. Protostuff is based on Google Protobuf, but has more features and is easier to use. A protobuff is a precompilation process that requires a data structure, which involves writing configuration files in.proto format and translating them into target language code using tools provided by Protobuf, whereas Protostuff omits this precompilation process. Here are the results of a performance test of the main Java serialization framework (image from the web) :
It can be found that Protostuff serialization is indeed a very efficient serialization framework, compared with other mainstream serialization and deserialization frameworks, its serialization performance can be seen. If it is used to encode and decode RPC messages, it is most suitable. Now post the implementation code for the Protostuff serialization codec.
The first is to define the Schema because Protostuff-Runtime implements protobuf serialization/deserialization of Java beans without precompilation. We can cache the Schema at runtime to improve serialization performance. The SchemaCache class is as follows:
package com.newlandframework.rpc.serialize.protostuff; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.concurrent.ExecutionException; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; / * * * @ author tangjie < https://github.com/tang-jie > * @ filename: SchemaCache. Java * @ description: * @ blogs SchemaCache function module http://www.cnblogs.com/jietang/ * @since 2016/10/7 */ public class SchemaCache { private static class SchemaCacheHolder { private static SchemaCache cache = new SchemaCache(); } public static SchemaCache getInstance() { return SchemaCacheHolder.cache; } private Cache<Class<? >, Schema<? >> cache = CacheBuilder.newBuilder() .maximumSize(1024).expireAfterWrite(1, TimeUnit.HOURS) .build(); private Schema<? > get(final Class<? > cls, Cache<Class<? >, Schema<? >> cache) { try { return cache.get(cls, new Callable<RuntimeSchema<? >>() { public RuntimeSchema<? > call() throws Exception { return RuntimeSchema.createFrom(cls); }}); } catch (ExecutionException e) { return null; } } public Schema<? > get(final Class<? > cls) { return get(cls, cache); }}Copy the code
The real Protostuff serialization and deserialization classes are then defined, which implement the RpcSerialize method of the RpcSerialize interface:
package com.newlandframework.rpc.serialize.protostuff; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import java.io.InputStream; import java.io.OutputStream; import com.newlandframework.rpc.model.MessageRequest; import com.newlandframework.rpc.model.MessageResponse; import com.newlandframework.rpc.serialize.RpcSerialize; import org.objenesis.Objenesis; import org.objenesis.ObjenesisStd; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffSerialize.java * @ description: ProtostuffSerialize function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class ProtostuffSerialize implements RpcSerialize { private static SchemaCache cachedSchema = SchemaCache.getInstance(); private static Objenesis objenesis = new ObjenesisStd(true); private boolean rpcDirect = false; public boolean isRpcDirect() { return rpcDirect; } public void setRpcDirect(boolean rpcDirect) { this.rpcDirect = rpcDirect; } private static <T> Schema<T> getSchema(Class<T> cls) { return (Schema<T>) cachedSchema.get(cls); } public Object deserialize(InputStream input) { try { Class cls = isRpcDirect() ? MessageRequest.class : MessageResponse.class; Object message = (Object) objenesis.newInstance(cls); Schema<Object> schema = getSchema(cls); ProtostuffIOUtil.mergeFrom(input, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } public void serialize(OutputStream output, Object object) { Class cls = (Class) object.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema schema = getSchema(cls); ProtostuffIOUtil.writeTo(output, object, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); }}}Copy the code
Also to improve the efficiency of using Protostuff serialization/deserialization classes, we can pool them instead of creating and destroying objects frequently. Now give Protostuff pooling processing categories: ProtostuffSerializeFactory, ProtostuffSerializePool implementation code:
package com.newlandframework.rpc.serialize.protostuff; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffSerializeFactory.java * @ description: ProtostuffSerializeFactory function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class ProtostuffSerializeFactory extends BasePooledObjectFactory<ProtostuffSerialize> { public ProtostuffSerialize create() throws Exception { return createProtostuff(); } public PooledObject<ProtostuffSerialize> wrap(ProtostuffSerialize hessian) { return new DefaultPooledObject<ProtostuffSerialize>(hessian); } private ProtostuffSerialize createProtostuff() { return new ProtostuffSerialize(); }}Copy the code
package com.newlandframework.rpc.serialize.protostuff; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffSerializePool.java * @ description: ProtostuffSerializePool function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class ProtostuffSerializePool { private GenericObjectPool<ProtostuffSerialize> ProtostuffPool; volatile private static ProtostuffSerializePool poolFactory = null; private ProtostuffSerializePool() { ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory()); } public static ProtostuffSerializePool getProtostuffPoolInstance() { if (poolFactory == null) { synchronized (ProtostuffSerializePool.class) { if (poolFactory == null) { poolFactory = new ProtostuffSerializePool(); } } } return poolFactory; } public ProtostuffSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) { ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory()); GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(maxTotal); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); ProtostuffPool.setConfig(config); } public ProtostuffSerialize borrow() { try { return getProtostuffPool().borrowObject(); } catch (final Exception ex) { ex.printStackTrace(); return null; } } public void restore(final ProtostuffSerialize object) { getProtostuffPool().returnObject(object); } public GenericObjectPool<ProtostuffSerialize> getProtostuffPool() { return ProtostuffPool; }}Copy the code
Now with Protostuff pooled processing class, we will use it to achieve the coding and decoding interface of NettyRPC, to achieve the purpose of coding and decoding RPC messages. The first is the RPC decoder code implemented by Protostuff:
package com.newlandframework.rpc.serialize.protostuff; import com.newlandframework.rpc.serialize.MessageCodecUtil; import com.newlandframework.rpc.serialize.MessageDecoder; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffDecoder.java * @ description: ProtostuffDecoder function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class ProtostuffDecoder extends MessageDecoder { public ProtostuffDecoder(MessageCodecUtil util) { super(util); }}Copy the code
Then the RPC encoder code implemented by Protostuff:
package com.newlandframework.rpc.serialize.protostuff; import com.newlandframework.rpc.serialize.MessageCodecUtil; import com.newlandframework.rpc.serialize.MessageEncoder; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffEncoder.java * @ description: ProtostuffEncoder function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class ProtostuffEncoder extends MessageEncoder { public ProtostuffEncoder(MessageCodecUtil util) { super(util); }}Copy the code
Finally, the implementation code of Protostuff RPC code and decoder tool class ProtostuffCodecUtil is reconstructed:
package com.newlandframework.rpc.serialize.protostuff; import com.google.common.io.Closer; import com.newlandframework.rpc.serialize.MessageCodecUtil; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffCodecUtil.java * @ description: ProtostuffCodecUtil function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class ProtostuffCodecUtil implements MessageCodecUtil { private static Closer closer = Closer.create(); private ProtostuffSerializePool pool = ProtostuffSerializePool.getProtostuffPoolInstance(); private boolean rpcDirect = false; public boolean isRpcDirect() { return rpcDirect; } public void setRpcDirect(boolean rpcDirect) { this.rpcDirect = rpcDirect; } public void encode(final ByteBuf out, final Object message) throws IOException { try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); closer.register(byteArrayOutputStream); ProtostuffSerialize protostuffSerialization = pool.borrow(); protostuffSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); pool.restore(protostuffSerialization); } finally { closer.close(); } } public Object decode(byte[] body) throws IOException { try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body); closer.register(byteArrayInputStream); ProtostuffSerialize protostuffSerialization = pool.borrow(); protostuffSerialization.setRpcDirect(rpcDirect); Object obj = protostuffSerialization.deserialize(byteArrayInputStream); pool.restore(protostuffSerialization); return obj; } finally { closer.close(); }}}Copy the code
In this way, NettyRPC has another way of message serialization, which further enhances its ability of RPC message network transmission.
Secondly, the thread model of NettyRPC server is optimized to make RPC message processing thread pool support more diverse queue containers for tasks. The code for RPC asynchronous processing thread pool RpcThreadPool is as follows:
package com.newlandframework.rpc.parallel; import com.newlandframework.rpc.core.RpcSystemConfig; import com.newlandframework.rpc.parallel.policy.AbortPolicy; import com.newlandframework.rpc.parallel.policy.BlockingPolicy; import com.newlandframework.rpc.parallel.policy.CallerRunsPolicy; import com.newlandframework.rpc.parallel.policy.DiscardedPolicy; import com.newlandframework.rpc.parallel.policy.RejectedPolicy; import com.newlandframework.rpc.parallel.policy.RejectedPolicyType; import java.util.concurrent.Executor; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.RejectedExecutionHandler; / * * * @ author tangjie < https://github.com/tang-jie > * @ filename: RpcThreadPool. Java * @ description: * RpcThreadPool module @blogs http://www.cnblogs.com/jietang/ * @since 2016/10/7 */ public class RpcThreadPool { private static RejectedExecutionHandler createPolicy() { RejectedPolicyType rejectedPolicyType = RejectedPolicyType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolRejectedPolicyAttr, "AbortPolicy")); switch (rejectedPolicyType) { case BLOCKING_POLICY: return new BlockingPolicy(); case CALLER_RUNS_POLICY: return new CallerRunsPolicy(); case ABORT_POLICY: return new AbortPolicy(); case REJECTED_POLICY: return new RejectedPolicy(); case DISCARDED_POLICY: return new DiscardedPolicy(); } return null; } private static BlockingQueue<Runnable> createBlockingQueue(int queues) { BlockingQueueType queueType = BlockingQueueType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolQueueNameAttr, "LinkedBlockingQueue")); switch (queueType) { case LINKED_BLOCKING_QUEUE: return new LinkedBlockingQueue<Runnable>(); case ARRAY_BLOCKING_QUEUE: return new ArrayBlockingQueue<Runnable>(RpcSystemConfig.PARALLEL * queues); case SYNCHRONOUS_QUEUE: return new SynchronousQueue<Runnable>(); } return null; } public static Executor getExecutor(int threads, int queues) { String name = "RpcThreadPool"; return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, createBlockingQueue(queues), new NamedThreadFactory(name, true), createPolicy()); }}Copy the code
ThreadPoolExecutor () ¶ getExecutor () ¶ ThreadPoolExecutor () ¶
The specific meanings of parameters are as follows:
- CorePoolSize is the thread pool reserve size.
- MaximumPoolSize is the maximum thread size of the thread pool.
- KeepAliveTime Indicates the timeout period for an idle thread to terminate.
- Unit Is used to specify the unit of keepAliveTime, such as milliseconds, seconds, minutes, hours, days, and so on.
- WorkQueue Stores the queue of tasks to be processed.
- Handler is used to specify a thread pool response strategy when the task queue fills up and the maximum thread pool size is reached.
The thread pool of NettyRPC supports the following three task queue types:
- LinkedBlockingQueue: An unbounded queue of tasks implemented as a linked list, although you can specify an additional capacity to make it bounded.
- ArrayBlockingQueue: a bounded array of task queues.
- SynchronousQueue: The task queue has a fixed capacity of 1. When a client submits a task, it blocks. It blocks until a processing thread picks up the pending task.
In the thread pool model of NettyRPC, when the thread pool cannot handle the situation, the specific countermeasures are as follows:
- AbortPolicy: Directly rejects the execution and throws rejectedExecution.
- DiscardedPolicy: “reduces the load” of a task queue by discarding half of the queue elements starting at the head of the task queue.
- CallerRunsPolicy: The task is not discarded and exceptions are not thrown, but the caller runs it himself. This is mainly because too many parallel requests will increase the load of the system, and the operating system will frequently perform context switching between scheduling threads. When the thread pool is full, it frequently switches and interrupts. Let’s serialize all parallel requests to ensure as little delay as possible, which is probably Doug Lea’s design idea.
After a detailed introduction to the details of the thread pool parameters, I will elaborate on the following, NettyRPC thread pool RpcThreadPool workflow:
- The thread pool of NettyRPC receives RPC data processing requests and continues to generate execution tasks if it determines that the number of active threads is smaller than the corePoolSize set by the thread pool.
- When corePoolSize is reached, the thread pool will put the tasks to be executed into the task queue.
- If the number of active threads is still smaller than the maximumPoolSize parameter in the thread pool after the task queue is full, the thread pool will continue to allocate the task thread for emergency processing, which will be executed immediately.
- If the maximum number of threads specified in the maximumPoolSize parameter of the thread pool is reached, the thread pool will invoke RejectedExecutionHandler to handle this issue.
The default thread pool setting in NettyRPC is to set both corePoolSize and maximumPoolSize to 16, and the task queue to an unbounded block queue. In the application, the thread pool parameters of NettyRPC should be reasonably planned according to the actual pressure and throughput. Currently, NettyRPC exposes a JMX interface, which is short for “Java Management Extensions”, a J2Ee-like specification, so that you can flexibly extend the monitoring and Management functions of the system. Real-time monitoring of the execution of RPC server thread pool tasks, specific JMX monitoring and measurement of key indicators of thread pool code implementation is as follows:
package com.newlandframework.rpc.parallel.jmx; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; / * * * @ author tangjie < https://github.com/tang-jie > * @ filename: ThreadPoolStatus. Java * @ description: ThreadPoolStatus function module * @blogs http://www.cnblogs.com/jietang/ * @since 2016/10/13 */ @ManagedResource public class ThreadPoolStatus { private int poolSize; private int activeCount; private int corePoolSize; private int maximumPoolSize; private int largestPoolSize; private long taskCount; private long completedTaskCount; @ManagedOperation public int getPoolSize() { return poolSize; } @ManagedOperation public void setPoolSize(int poolSize) { this.poolSize = poolSize; } @ManagedOperation public int getActiveCount() { return activeCount; } @ManagedOperation public void setActiveCount(int activeCount) { this.activeCount = activeCount; } @ManagedOperation public int getCorePoolSize() { return corePoolSize; } @ManagedOperation public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } @ManagedOperation public int getMaximumPoolSize() { return maximumPoolSize; } @ManagedOperation public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } @ManagedOperation public int getLargestPoolSize() { return largestPoolSize; } @ManagedOperation public void setLargestPoolSize(int largestPoolSize) { this.largestPoolSize = largestPoolSize; } @ManagedOperation public long getTaskCount() { return taskCount; } @ManagedOperation public void setTaskCount(long taskCount) { this.taskCount = taskCount; } @ManagedOperation public long getCompletedTaskCount() { return completedTaskCount; } @ManagedOperation public void setCompletedTaskCount(long completedTaskCount) { this.completedTaskCount = completedTaskCount; }}Copy the code
ThreadPoolStatus monitoring class: ThreadPoolStatus. The monitoring indicators are as follows:
- PoolSize: specifies the number of current threads in the pool
- ActiveCount: Approximate number of threads that actively execute a task
- CorePoolSize: number of core threads
- MaximumPoolSize: specifies the maximum number of threads allowed
- LargestPoolSize: indicates the maximum number of historical threads
- TaskCount: Approximate total number of tasks that are scheduled to be executed
- CompletedTaskCount: Approximate total number of completed tasks executed
CorePoolSize and maximumPoolSize have been described in detail above.
NettyRPC thread pool monitoring the JMX interface: ThreadPoolMonitorProvider, JMX through JNDI – RMI remote connection in the form of communication, specific implementation ways are as follows:
package com.newlandframework.rpc.parallel.jmx; import com.newlandframework.rpc.netty.MessageRecvExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.EnableMBeanExport; import org.springframework.jmx.support.ConnectorServerFactoryBean; import org.springframework.jmx.support.MBeanServerConnectionFactoryBean; import org.springframework.jmx.support.MBeanServerFactoryBean; import org.springframework.remoting.rmi.RmiRegistryFactoryBean; import org.apache.commons.lang3.StringUtils; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.ReflectionException; import javax.management.MBeanException; import javax.management.InstanceNotFoundException; import java.io.IOException; /** * @author tangjie<https://github.com/tang-jie> * @filename:ThreadPoolMonitorProvider.java * @ description: ThreadPoolMonitorProvider function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/13 * / @Configuration @EnableMBeanExport @ComponentScan("com.newlandframework.rpc.parallel.jmx") public class ThreadPoolMonitorProvider { public final static String DELIMITER = ":"; public static String url; public static String jmxPoolSizeMethod = "setPoolSize"; public static String jmxActiveCountMethod = "setActiveCount"; public static String jmxCorePoolSizeMethod = "setCorePoolSize"; public static String jmxMaximumPoolSizeMethod = "setMaximumPoolSize"; public static String jmxLargestPoolSizeMethod = "setLargestPoolSize"; public static String jmxTaskCountMethod = "setTaskCount"; public static String jmxCompletedTaskCountMethod = "setCompletedTaskCount"; @Bean public ThreadPoolStatus threadPoolStatus() { return new ThreadPoolStatus(); } @Bean public MBeanServerFactoryBean mbeanServer() { return new MBeanServerFactoryBean(); } @Bean public RmiRegistryFactoryBean registry() { return new RmiRegistryFactoryBean(); } @Bean @DependsOn("registry") public ConnectorServerFactoryBean connectorServer() throws MalformedObjectNameException { MessageRecvExecutor ref = MessageRecvExecutor.getInstance(); String ipAddr = StringUtils.isNotEmpty(ref.getServerAddress()) ? StringUtils.substringBeforeLast(ref.getServerAddress(), DELIMITER) : "localhost"; url = "service:jmx:rmi://" + ipAddr + "/jndi/rmi://" + ipAddr + ":1099/nettyrpcstatus"; System.out.println("NettyRPC JMX MonitorURL : [" + url + "]"); ConnectorServerFactoryBean connectorServerFactoryBean = new ConnectorServerFactoryBean(); connectorServerFactoryBean.setObjectName("connector:name=rmi"); connectorServerFactoryBean.setServiceUrl(url); return connectorServerFactoryBean; } public static void monitor(ThreadPoolStatus status) throws IOException, MalformedObjectNameException, ReflectionException, MBeanException, InstanceNotFoundException { MBeanServerConnectionFactoryBean mBeanServerConnectionFactoryBean = new MBeanServerConnectionFactoryBean(); mBeanServerConnectionFactoryBean.setServiceUrl(url); mBeanServerConnectionFactoryBean.afterPropertiesSet(); MBeanServerConnection connection = mBeanServerConnectionFactoryBean.getObject(); ObjectName objectName = new ObjectName("com.newlandframework.rpc.parallel.jmx:name=threadPoolStatus,type=ThreadPoolStatus"); connection.invoke(objectName, jmxPoolSizeMethod, new Object[]{status.getPoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxActiveCountMethod, new Object[]{status.getActiveCount()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxCorePoolSizeMethod, new Object[]{status.getCorePoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxMaximumPoolSizeMethod, new Object[]{status.getMaximumPoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxLargestPoolSizeMethod, new Object[]{status.getLargestPoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxTaskCountMethod, new Object[]{status.getTaskCount()}, new String[]{long.class.getName()}); connection.invoke(objectName, jmxCompletedTaskCountMethod, new Object[]{status.getCompletedTaskCount()}, new String[]{long.class.getName()}); }}Copy the code
Once the NettyRPC server is successfully started, it can be monitored through the JMX interface by opening JConsole and typing the URL: Service: JMX: rmi: / / 127.0.0.1 / jndi/rmi: / / 127.0.0.1:1099 / nettyrpcstatus, username, password, the default is empty, click the connect button.
When a client makes an RPC request, you can view the following monitoring page through JMX:
At this time, click the button of each monitoring indicator of NettyRPC thread pool, and you can intuitively see the real-time monitoring of main parameters of the thread pool in the actual operation of NettyRPC. For example, click getCompletedTaskCount to see the total number of thread tasks that have been completed so far. The specific situation is shown in the figure below:
As you can see, 40,280 RPC requests have been processed so far. In this way, we can quasi-real-time monitor whether NettyRPC thread pool parameter Settings and capacity planning are reasonable, so as to make timely adjustments and make reasonable and maximum use of software and hardware resources.
After the final refactoring, the Spring configuration of the NettyRPC server (NettyRPC/NettyRPC 2.0/main/resources/rpc-invoke-config-server.xml) is as follows:
<? The XML version = "1.0" encoding = "utf-8"? > <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd"> <! <context:property-placeholder location="classpath:rpc-server.properties"/> <! - defines the RPC service interface - > < nettyrpc: service id = "demoAddService interfaceName" = "com. Newlandframework. RPC. Services. AddCalculate" ref="calcAddService"></nettyrpc:service> <nettyrpc:service id="demoMultiService" interfaceName="com.newlandframework.rpc.services.MultiCalculate" ref="calcMultiService"></nettyrpc:service> <! -- Register RPC server, <nettyrpc:registry id="rpcRegistry" ipAddr="${rpc.server.addr}" protocol="PROTOSTUFFSERIALIZE"></nettyrpc:registry> <! - the RPC service implementation class declaration - > < bean id = "calcAddService" class = "com. Newlandframework. RPC. Services. Impl. AddCalculateImpl" > < / bean > < bean id="calcMultiService" class="com.newlandframework.rpc.services.impl.MultiCalculateImpl"></bean> </beans>Copy the code
The nettyRPC :service tag is used to define the service interface supported by the RPC server. This example declares that the current RPC server provides the addition and multiplication services for the client to call. For details about Spring custom tags, you can refer to Github: NettyRPC / 2.0 / main/Java/com/NettyRPC newlandframework/RPC/spring/package (path) implementation of the code, the code is more use of the characteristics of the spring framework, hope you can understanding and analysis on its own.
Then with bean tag declares the addition, multiplication calculation interface corresponding implementation classes, are unified in com. Newlandframework. RPC. Services package.
Finally, the RPC server is registered through NettyRPC: Registry, and the ipAddr attribute defines the IP/port information corresponding to the RPC server. Protocol Specifies the message serialization protocol type supported by the current RPC server.
Has been implemented types are: the JDK native object serialization (ObjectOutputStream/ObjectInputStream), Kryo, Hessian, Protostuff four kind of serialization.
Configured RPC invoke – config – server. After the XML, you can start the RPC server Main function entry: com. Newlandframework. RPC. Boot. RpcServerStarter. Maven is packaged and deployed on (Red Hat Enterprise Linux Server Release 5.7 (Tikanga) 64-bit systems with the kernel version number: Kernel 2.6.18-274.el5 on an x86_64), you can start NettyRPC. If all is normal, the CRT terminal will display the following output:
At this point, Spring configuration is performed on the client side (NettyRPC/NettyRPC 2.0/test/resources/rpc-invoke-config-client.xml).
<? The XML version = "1.0" encoding = "utf-8"? > <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd"> <! <context:property-placeholder location="classpath:rpc-server.properties"/> <! < nettyRPC :reference id="addCalc" -- < nettyRPC :reference ID ="addCalc" interfaceName="com.newlandframework.rpc.services.AddCalculate" protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/> <nettyrpc:reference id="multiCalc" interfaceName="com.newlandframework.rpc.services.MultiCalculate" protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/> </beans>Copy the code
The demo code of addition calculation and multiplication calculation is as follows:
package com.newlandframework.rpc.services; /** * @author Tangjie <https://github.com/tang-jie> * @filename:Calculate. Java * @description:Calculate function module * @blogs http://www.cnblogs.com/jietang/ * @ since 2016/10/7 * / public interface AddCalculate {/ / two together int the add (int a, int b); }Copy the code
package com.newlandframework.rpc.services.impl; import com.newlandframework.rpc.services.AddCalculate; / * * * @ author tangjie < https://github.com/tang-jie > * @ filename: CalculateImpl. Java * @ description: * CalculateImpl module @blogs http://www.cnblogs.com/jietang/ * @since 2016/10/7 */ public class AddCalculateImpl implements AddCalculate { Public int add(int a, int b) {return a + b; }}Copy the code
package com.newlandframework.rpc.services; /** * @author Tangjie <https://github.com/tang-jie> * @filename:Calculate. Java * @description:Calculate function module * @blogs http://www.cnblogs.com/jietang/ * @ since 2016/10/7 * / public interface MultiCalculate {/ / two Numbers multiplication int multi (int a, int b); }Copy the code
package com.newlandframework.rpc.services.impl; import com.newlandframework.rpc.services.MultiCalculate; / * * * @ author tangjie < https://github.com/tang-jie > * @ filename: CalculateImpl. Java * @ description: * CalculateImpl module @blogs http://www.cnblogs.com/jietang/ * @since 2016/10/7 */ public class MultiCalculateImpl implements MultiCalculate { Public int multi(int a, int b) {return a * b; }}Copy the code
It is worth noting that the Spring configuration of the client NettyRPC must not only specify the service information for invoking remote RPC, but also configure the CORRESPONDING IP address, port information, and protocol type of the remote RPC server, which must be consistent with that of the RPC server, so that the message encoding and decoding can be carried out normally.
Now we simulate 1W instantaneous concurrent addition and multiplication calculation requests, with a total of 2W pen request operations, and call the calculation module on the remote RPC server. By default, we use protostuff serialization to encode and decode RPC messages. Note that the sample test code is based on 1W pen instantaneous concurrent computation requests, not 1W pen cyclic computation requests, which is an important measure of RPC server throughput, so the test sample here is written based on CountDownLatch. Class Java. Util. Concurrent CountDownLatch is a synchronous auxiliary class, at the completion of a group are other threads performed before operation, and it allows one or more threads have been waiting for. Here, the RPC request is calculated by addition and the RPC request is calculated by multiplication. 1W threads are started on the RPC client respectively. At this time, they are suspended first and then wait for the request signal to initiate the RPC request instantaneously. The specific code is as follows:
First is the addition AddCalcParallelRequestThread concurrent requests class:
package com.newlandframework.test; import com.newlandframework.rpc.services.AddCalculate; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; /** * @author tangjie<https://github.com/tang-jie> * @filename:AddCalcParallelRequestThread.java * @ description: AddCalcParallelRequestThread function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class AddCalcParallelRequestThread implements Runnable { private CountDownLatch signal; private CountDownLatch finish; private int taskNumber = 0; private AddCalculate calc; public AddCalcParallelRequestThread(AddCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) { this.signal = signal; this.finish = finish; this.taskNumber = taskNumber; this.calc = calc; } public void run() {try {// Add the thread to wait for signal.await(); Int add = calc.add(taskNumber, taskNumber); System.out.println("calc add result:[" + add + "]"); finish.countDown(); } catch (InterruptedException ex) { Logger.getLogger(AddCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex); }}}Copy the code
The second is the multiplication calculation MultiCalcParallelRequestThread concurrent requests class:
package com.newlandframework.test; import com.newlandframework.rpc.services.MultiCalculate; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; /** * @author tangjie<https://github.com/tang-jie> * @filename:MultiCalcParallelRequestThread.java * @ description: MultiCalcParallelRequestThread function module * * @ @ blogs http://www.cnblogs.com/jietang/ since 2016/10/7 * / public class MultiCalcParallelRequestThread implements Runnable { private CountDownLatch signal; private CountDownLatch finish; private int taskNumber = 0; private MultiCalculate calc; public MultiCalcParallelRequestThread(MultiCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) { this.signal = signal; this.finish = finish; this.taskNumber = taskNumber; this.calc = calc; } public void run() {try {// wait for signal. Await (); Int multi = calc.multi(taskNumber, taskNumber); System.out.println("calc multi result:[" + multi + "]"); finish.countDown(); } catch (InterruptedException ex) { Logger.getLogger(MultiCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex); }}}Copy the code
Now write a test client called RpcParallelTest to test the performance of the RPC server and to see if the final result is calculated correctly. Test client RpcParallelTest (RpcParallelTest)
package com.newlandframework.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.newlandframework.rpc.services.AddCalculate; import com.newlandframework.rpc.services.MultiCalculate; import org.apache.commons.lang3.time.StopWatch; import org.springframework.context.support.ClassPathXmlApplicationContext; / * * * @ author tangjie < https://github.com/tang-jie > * @ filename: RpcParallelTest. Java * @ description: * RpcParallelTest module @blogs http://www.cnblogs.com/jietang/ * @since 2016/10/7 */ public class RpcParallelTest { public static void parallelAddCalcTask(AddCalculate calc, Int parallel) throws InterruptedException {// Start StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { AddCalcParallelRequestThread client = new AddCalcParallelRequestThread(calc, signal, finish, index); new Thread(client).start(); } signal.countDown(); finish.await(); sw.stop(); String tip = string. format(" Add total RPC call time: [%s] ms ", sw.getTime()); System.out.println(tip); } public static void parallelMultiCalcTask(MultiCalculate calc, Int parallel) throws InterruptedException {// Start StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { MultiCalcParallelRequestThread client = new MultiCalcParallelRequestThread(calc, signal, finish, index); new Thread(client).start(); } signal.countDown(); finish.await(); sw.stop(); String tip = string. format(" Multiply RPC call total time: [%s] ms ", sw.getTime()); System.out.println(tip); } public static void addTask(AddCalculate calc, int parallel) throws InterruptedException { RpcParallelTest.parallelAddCalcTask(calc, parallel); TimeUnit.MILLISECONDS.sleep(30); } public static void multiTask(MultiCalculate calc, int parallel) throws InterruptedException { RpcParallelTest.parallelMultiCalcTask(calc, parallel); TimeUnit.MILLISECONDS.sleep(30); } public static void main(String[] args) throws Exception {// parallel 10000 int parallel = 10000; / / load the Spring configuration information ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:rpc-invoke-config-client.xml"); AddTask ((AddCalculate) context.getBean("addCalc"), parallel); multiTask((MultiCalculate) context.getBean("multiCalc"), parallel); System.out.printf("[author tangjie] Netty RPC Server Message protocol serialization and concurrent authentication end! \n\n"); context.destroy(); }}Copy the code
The running status of the Netty RPC client is as follows: The following is a screenshot of the result of adding the RPC server.
Ok, the addition RPC request is calculated, and the console prints out the request time.
This is followed by a call to RPC parallel multiplication to calculate the request, and again, the console prints out the request time.
Then the CLIENT of RPC finishes running and exits. Let’s continue to see the screenshot of the operation of NettyRPC server:
It can be found that the server of NettyRPC did receive the RPC calculation request initiated by the client, identified a unique message code for each RPC message, and successfully responded to the client after the RPC calculation.
After a series of module reconstruction, NettyRPC was finally upgraded again. After this reconstruction work, I feel I have a deeper understanding of Netty, Spring and Java thread model. A journey of a thousand miles begins with a single step. Learning depends on such a bit by bit of repeated accumulation, in order to improve their ability to a step.
Original article, coupled with my talent shallow, writing is limited, this article has said wrong place, I hope you don’t hesitate to give advice. I hope readers can supplement the places ignored and correct the mistakes.
Finally, attach the open source project address of NettyRPC: github.com/tang-jie/Ne… NettyRPC 2.0 project in.
Thank you for reading this series of NettyRPC articles. If this article is helpful to you, please click the recommendation below.