This is a simple demo of RPC remote calls: The Consumer remotely calls the Provider’s service method sayHelloWorld(String MSG) via RPC, and the Provider returns “Hello World” to the Consumer.

Netty is used here to achieve remote communication to achieve RPC calls, consumers through proxies to call remote services. This article covers proxy mode, JDK dynamic proxy and Netty communication. This simple demo caches the service provider’s service registration locally to the JVM, and we will consider registering the service provider’s service to the ZooKeeper registry later.

This simple demo will be implemented from the following four aspects: the first is the common base layer, this layer is the Consumer and Provider will share the API and netty communication to exchange information; The second is the implementation of Provider local registration service; The third is the implementation of the Provider and the fourth is the implementation of the Consumer. Without further ado, let’s go directly to the code:

Github Project address:

Github.com/jinyue233/j…

1. Common base layer

1.1 Call Message: RpcMessage

package com.jinyue.common.message; import java.io.Serializable; Public class RpcMessage implements Serializable {private String className; /** * public class RpcMessage implements Serializable {private String className; private String methodName; private Class<? >[] parameterType; private Object[] parameterValues; public RpcMessage(String className, String methodName, Class<? >[] parameterType, Object[] parameterValues) { this.className = className; this.methodName = methodName; this.parameterType = parameterType; this.parameterValues = parameterValues; } public void setClassName(String className) { this.className = className; } public void setMethodName(String methodName) { this.methodName = methodName; } public void setParameterType(Class<? >[] parameterType) { this.parameterType = parameterType; } public void setParameterValues(String parameterValue) { this.parameterValues = parameterValues; } public String getClassName() { return className; } public String getMethodName() { return methodName; } public Class<? >[] getParameterType() { return parameterType; } public Object[] getParameterValues() { return parameterValues; }}Copy the code

1.2 Interface API: IHelloWorld

package com.jinyue.common.api;

public interface IHelloWorld {
    String sayHelloWorld(String name, String content);
}
Copy the code

2. Provider implements the local registration service

2.1 Provider Server initiator class: LocalRegistryMain

package com.jinyue.registry; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import org.apache.log4j.Logger; /** * Add ProviderRegistryHandler to Netty's handler handler function when the provider starts the Netty service. */ public class LocalRegistryMain { private static final Logger logger = Logger.getLogger(LocalRegistryMain.class); private static final int SERVER_PORT = 8888; Public static void main(String[] args) {// Create primary/secondary EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); Serverbootstrap. group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); / / here add decoder and encoder, prevent the problem of unpacking and glue bag pipeline. The addLast (new LengthFieldBasedFrameDecoder (Integer. MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); // Pipeline. AddLast ("jdkencoder", new ObjectEncoder()); pipeline.addLast("jdkdecoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); AddLast (new ProviderNettyHandler()); // Add your own business logic and add the handle registered with the service to the pipeline. }}); logger.info("server start,the port is " + SERVER_PORT); ChannelFuture Future = serverbootstrap.bind (SERVER_PORT).sync(); // closeFuture future.channel().closefuture ().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally {// Finally remember the master and slave group to gracefully stop. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

2.2Provider Service Registration Handler: ProviderNettyHandler

package com.jinyue.registry; import com.jinyue.common.message.RpcMessage; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method; ProviderNettyHandler retrieves the implementation instance from the cached ProviderRestry instance based on the interface name passed to it. The ProviderNettyHandler retrieves the implementation method from the cached ProviderRestry instance based on the interface name passed to it. Finally, return the result of the call to the consumer. * / public class ProviderNettyHandler extends ChannelInboundHandlerAdapter {/ * * * when netty server receives the request of the consumer, The channelRead method is then used to extract the parameters of the consumer call, extract the reflection instance from the ProviderRestry class's cache registry, instanceCacheMap, and make the method call. * @param CTX * @param MSG * @throws Exception */ @override public void channelRead(ChannelHandlerContext ctx, RpcMessage = (RpcMessage) MSG; RpcMessage = (RpcMessage) MSG; String interfaceName = rpcMessage.getClassName(); String methodName = rpcMessage.getMethodName(); Class<? >[] parameterType = rpcMessage.getParameterType(); Object[] parameterValues = rpcMessage.getParameterValues(); // Extract the provider instance registered with the cache instanceCacheMap, Then reflection calls the Object instance = ProviderLocalRegistry. GetInstanceCacheMap () get (interfaceName); Method method = instance.getClass().getMethod(methodName, parameterType); Object res = method.invoke(instance, parameterValues); // The result is flushed to the netty output stream and returned to consumer ctx.writeAndFlush(res); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

2.3 service ProviderLocalRegistry class: ProviderLocalRegistry

package com.jinyue.registry; import org.apache.log4j.Logger; import java.io.File; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Register the provider's service implementation class to the local cache, using ConcurrentHashMap */ public class ProviderLocalRegistry {private static final Logger Logger = Logger.getLogger(ProviderNettyHandler.class); Private static final String PROVIDER_PACKAGE_NAME = "com.jinyue.provider"; private static final String PROVIDER_PACKAGE_NAME = "com.jinyue.provider"; Private static Map<String, Object> instanceCacheMap = new ConcurrentHashMap<>(); Private static List<String> providerClassList = new ArrayList<>(); Static {// Scan providerInstance (PROVIDER_PACKAGE_NAME); static {// Scan providerInstance (PROVIDER_PACKAGE_NAME); } /** * Scan the implementation classes under the provider package, @param packageName */ private static void loadProviderInstance(String packageName) { findProviderClass(packageName); putProviderInstance(); } /** * find all implementation class names in the provider package, Private static void findProviderClass(final String packageName) {private static void findProviderClass(final String packageName) this.getClass().getClassLoader().getResource(PROVIDER_PACKAGE_NAME.replace("\\.", "/")); URL URL = new Object() {public URL getPath() {String packageDir = packageName.replace(".", "/"); URL o = this.getClass().getClassLoader().getResource(packageDir); return o; } }.getPath(); // Convert the package name to File format to determine whether it is a folder or a File, or recursively call this method if it is a folder, File dir = new File(url.getFile()); File[] fileArr = dir.listFiles(); for (File file : fileArr) { if (file.isDirectory()) { findProviderClass(packageName + "." + file.getName()); } else { providerClassList.add(packageName + "." + file.getName().replace(".class", "")); }}} /** * iterate over the implementation class of the providerClassList collection, in turn taking the implementation class's interface as key, Private static void putProviderInstance() {private static void putProviderInstance() {private static void putProviderInstance() { For (String providerClassName: providerClassList) {// We already have providerClassName, so we can use reflection to generate instance try {Class<? > providerClass = Class.forName(providerClassName); // Get the fully qualified name of the interface that implements the class as key, Because consumer call is the fully qualified name of the interface transfer to come over from the cache for instance to reflection calls String providerClassInterfaceName = providerClass. GetInterfaces () [0]. GetName (); / / get Provicder implementation class instance of the Object instance = providerClass. NewInstance (); instanceCacheMap.put(providerClassInterfaceName, instance); Logger. The info (" registered "+ providerClassInterfaceName +" services "); } catch (Exception e) { e.printStackTrace(); } } } public static Map<String, Object> getInstanceCacheMap() { return instanceCacheMap; }}Copy the code

3 Concrete service provider implementation class: HelloWorldImpl

package com.jinyue.provider; import com.jinyue.common.api.IHelloWorld; /** * implements IHelloWorld {public String sayHelloWorld(String name, String name) String content) { return name + " say:" + content; }}Copy the code

4. Serve consumers

4.1 ConsumerTest class: ConsumerTest

package com.jinyue.consumer; import com.jinyue.common.api.IHelloWorld; import com.jinyue.consumer.proxy.RpcProxyFactory; Public class ConsumerTest {public static void main(String[] args) {IHelloWorld helloWorld = (IHelloWorld)new RpcProxyFactory(IHelloWorld.class).getProxyInstance(); System.out.println(helloWorld.sayHelloWorld("jinyue", "hello world!" )); }}Copy the code

4.2 Proxy generation factory class: RpcProxyFactory

package com.jinyue.consumer.proxy; import com.jinyue.consumer.request.ConsumerNettyRequest; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * Dynamic proxy factory class, which generates a proxy class that calls the target interface. This proxy class is called by the invoke method of InvocationHandler to send information to the server. The invoked target method, parameter type and parameter value of the invoked target method are sent to the Netty server. The Netty server receives the requested information, and then gets the provider's implementation class from the cache Map (mock registry), and then uses reflection to invoke the target method. */ public class RpcProxyFactory { private Class<? > target; public RpcProxyFactory(Class<? > target) { this.target = target; } public Object getProxyInstance() { return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { return new ConsumerNettyRequest().sendRequest(target.getName(), method.getName(), method.getParameterTypes(), args); }}); }}Copy the code

4.3 The consumer sends the Netty startup and request class: ConsumerNettyRequest

package com.jinyue.consumer.request; import com.jinyue.common.message.RpcMessage; import com.jinyue.consumer.handler.ConsumerNettyHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; Public class ConsumerNettyRequest {public Object sendRequest(String) interfaceName, String methodName, Class<? >[] parameterType, Object[] parameterValues) { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); ConsumerNettyHandler consumerNettyHandler = new ConsumerNettyHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); / / here add decoder and encoder, prevent the problem of unpacking and glue bag pipeline. The addLast (new LengthFieldBasedFrameDecoder (Integer. MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); // Pipeline. AddLast ("jdkencoder", new ObjectEncoder()); pipeline.addLast("jdkdecoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // Add your own business logic, add the service registered handle to pipeline. AddLast (consumerNettyHandler); }}); ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync(); future.channel().writeAndFlush(new RpcMessage(interfaceName, methodName, parameterType, parameterValues)).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } return consumerNettyHandler.getRes(); }}Copy the code

Handle relevant Handler: 4.3 consumer ConsumerNettyHandler

package com.jinyue.consumer.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * This class is mainly used when the client requests the netty server and returns the result. Will callback channelRead method receives the RPC call returns results * / public class ConsumerNettyHandler extends ChannelInboundHandlerAdapter {private Object res; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { this.res = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } public Object getRes() { return res; }}Copy the code

Finally, execute the following code, which runs the previous ConsumerTest class to test consumer by calling provider’s sayHelloWorld method via Netty RPC:

public class ConsumerTest {
    public static void main(String[] args) {
        IHelloWorld helloWorld = (IHelloWorld)new RpcProxyFactory(IHelloWorld.class).getProxyInstance();
        System.out.println(helloWorld.sayHelloWorld("jinyue", "hello world!"));

    }
}
Copy the code

Final test results:

Project Address:

Github.com/jinyue233/j…