No matter you are a course of study or a career change, so excellent you must have been in primary school Chinese, so you must be no stranger to expanding and contracting sentences. Contractions are the process of removing all the trimmings to extract the core of a sentence without losing the basic semantics. Let’s implement a simple RPC program and explore its essence to understand the complex RPC framework. The so-called complex framework is to enrich the functions of RPC by adding some design decorations in the simple process, such as Dubbo filter, router, loadblance, cluster fault tolerance, various Invokers, communication protocol, etc., which is a process of sentence enlargement. Article welfare, attached a photo of Liu Mimei
RPC refers to remote procedure call, that is, two servers A and B, one application deployed on server A, want to call the function/method provided by the application on server B, because the memory space is not the same, cannot call directly, you need to initiate A call request to obtain the result through the network.
Both the mainstream and the minority RPC frameworks in the market have implemented the semantics of RPC. Service governance: Dubbo, Dubbox, Motan; Multi-language: GRPC, Thrift, Avro, Protocol buffers
【 Blogger recently wrote a Java implementation of RPC framework Bridge welcome to pay attention, consider Mesh 】
A, principle
First of all, a graph is used to briefly describe the call process of RPC, which is taken from the official website of Dubbo. It is not the simplest graph, but it is very simple. The simplest RPC call is left after removing the Registry and Monitor, which is simply a network request.
Process Description:
- Start the server provider and register your exposed service address and service details with the registry
- Then start the consumer side and subscribe to the contents of the registry, the subscription service, to get the details of the service
- If the service changes, the registry will notify the consumer to update the subscription content and update the service details.
- The client obtains the service details and initiates a network request to the server to obtain the result
- The monitor can obtain, but not limited to, service invocation details and consumption details
OK, the principle is so simple, next according to the above description step by step implementation.
Second, hands-on practice
The following is based on SpringBoot to achieve the above process.
2.1 Building Modules
Construction project and sub-module, engineering structure is as follows:
2.2 Implementing the Server
Take a look at the server side
Define the interface in the API module, and reference both the Consumer and Provider modules. The interface HelloService code is as follows
package com.glmapper.simple.api;
/**
* service interface
*
* @author: Jerry
*/
public interface HelloService {
/**
* service function
*
* @param name
* @return* /
String hello(String name);
}
Copy the code
Then implement the interface in the Provider module with a custom annotation @simpleProvider
package com.glmapper.simple.provider.annotation;
/** * Custom service annotation **@author Jerry
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
// indicates that Spring can scan
@Component
public @interfaceSimpleProvider { Class<? > value(); }Copy the code
The annotation uses the @Component flag, so it can be scanned by Spring. Let’s look at the implementation class HelloServiceImpl:
package com.glmapper.simple.provider.service;
/**
* service implement class
*
* @author: Jerry
*/
@SimpleProvider(HelloService.class)
public class HelloServiceImpl implements HelloService {
/**
* service function
*
* @param name
* @return* /
@Override
public String hello(String name) {
return "Hello! "+ name; }}Copy the code
In defining a service configuration class, SimpleProviderProperties, easy to configure through the application.yml file,
package com.glmapper.simple.provider.property;
/**
* provider properties
*
* @author: Jerry
*/
public class SimpleProviderProperties {
/** * exposes the service port */
private Integer port;
public Integer getPort(a) {
return port;
}
public void setPort(Integer port) {
this.port = port; }}Copy the code
At this point, the base class file is done, and the service initialization, the entry ProviderInitializer, is started
package com.glmapper.simple.provider;
/** * Start and register the service **@author Jerry
*/
public class ProviderInitializer implements ApplicationContextAware.InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);
private SimpleProviderProperties providerProperties;
/** * service registry */
private ServiceRegistry serviceRegistry;
/** * store interface and service implement mapping */
private Map<String, Object> handlerMap = new HashMap<>();
public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {
this.providerProperties = providerProperties;
this.serviceRegistry = serviceRegistry;
}
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
// Get the Bean annotated by SimpleProvider
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for(Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName(); handlerMap.put(interfaceName, serviceBean); }}}@Override
public void afterPropertiesSet(a) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new SimpleDecoder(SimpleRequest.class))
.addLast(new SimpleEncoder(SimpleResponse.class))
.addLast(newSimpleHandler(handlerMap)); }}; bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(channelHandler) .option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String host = getLocalHost();
if (null == host) {
LOGGER.error("can't get service address,because address is null");
throw new SimpleException("can't get service address,because address is null");
}
int port = providerProperties.getPort();
ChannelFuture future = bootstrap.bind(host, port).sync();
LOGGER.debug("server started on port {}", port);
if(serviceRegistry ! =null) {
String serverAddress = host + ":" + port;
serviceRegistry.register(serverAddress);
}
future.channel().closeFuture().sync();
} finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}/**
* get service host
*
* @return* /
private String getLocalHost(a) {
Enumeration<NetworkInterface> allNetInterfaces;
try {
allNetInterfaces = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
LOGGER.error("get local address error,cause:", e);
return null;
}
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = addresses.nextElement();
if (ip instanceofInet4Address && ! ip.isLoopbackAddress() && ! ip.getHostAddress().contains(":")) {
returnip.getHostAddress(); }}}return null; }}Copy the code
Describe what this class does:
- First he made it happen
ApplicationContextAware, InitializingBean
The twospring
Interface, according toIOC
The order in which the container is initialized is returned to the calling interfacesetApplicationContext
和afterPropertiesSet
Methods.setApplicationContext
Method is used to obtain the container@SimpleProvider
Annotated class and store the service interface name and service implementation class bound tohandlerMap
, in the@SimpleProvider
The value attribute is used to specify which service interface a class can implement. Of course, it can also be defined as an array to handle multiple interfacesafterPropertiesSet
The method does two things:- On the server side, a thread pool is enabled to handle socket requests, listen for and process requests received on the service exposure port, and specify a handler
SimpleHandler
- call
ServiceRegistry
Of the classregistry
Methods tozookeeper
Register the address and port of the service. No protocol is used here, only IP :port is registered
- On the server side, a thread pool is enabled to handle socket requests, listen for and process requests received on the service exposure port, and specify a handler
SimpleHandler is an implementation of a netty SimpleChannelInboundHandler request handler class
package com.glmapper.simple.provider.handler;
/**
* request handler
*
* @author Jerry
*/
public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);
private final Map<String, Object> handlerMap;
public SimpleHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {
SimpleResponse response = new SimpleResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object handle(SimpleRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); Class<? > serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<? >[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);return serviceFastMethod.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("server caught exception", cause); ctx.close(); }}Copy the code
SimpleHandler’s Netty-based event-driven model triggers the corresponding method. When a request event is received, the channelRead0 method is called. The function of this method is to find the corresponding implementation class and call the specified method based on the interface name in the request parameters, and then return the result.
Again, look at ServiceRegistry. The entry is ProviderInitializer, which calls the registry method of ServiceRegistry
package com.glmapper.simple.provider.registry;
/**
* connect zookeeper to registry service
*
* @author Jerry
*/
public class ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private ZookeeperProperties zookeeperProperties;
public ServiceRegistry(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
}
public void register(String data) {
if(data ! =null) {
ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());
if(zk ! =null) { addRootNode(zk); createNode(zk, data); }}}/**
* add one zookeeper root node
*
* @param zk
*/
private void addRootNode(ZooKeeper zk) {
try {
String registryPath = zookeeperProperties.getRootPath();
Stat s = zk.exists(registryPath, false);
if (s == null) {
zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }}catch (KeeperException | InterruptedException e) {
LOGGER.error("zookeeper add root node error,cause:", e); }}private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();
String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
LOGGER.error("create zookeeper node error,cause:", e); }}}Copy the code
The ServiceRegistry class does a simple job of registering the service IP address :port in the specified directory in zK
- Create the root node, which is a permanent node
- Create a temporary child node under the root node. The child node stores the IP address of the service :port. If the service fails, the child node will be killed
2.3 the consumer end
Content at the consumer end:
There is little content on the consumer side, with only three core classes: ServiceDiscovery, ConsumerHandler, and ConsumerProxy
Take a look at ServiceDiscovery:
package com.glmapper.simple.consumer.discovery;
/** * Service discovery: connect to ZK, add watch event **@author Jerry
*/
public class ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);
private volatile List<String> nodes = new ArrayList<>();
private ZookeeperProperties zookeeperProperties;
public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
String address = zookeeperProperties.getAddress();
int timeout = zookeeperProperties.getTimeout();
ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);
if(zk ! =null) { watchNode(zk); }}public String discover(a) {
String data = null;
int size = nodes.size();
if (size > 0) {
if (size == 1) {
data = nodes.get(0);
LOGGER.debug("using only node: {}", data);
} else {
data = nodes.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("using random node: {}", data); }}return data;
}
private void watchNode(final ZooKeeper zk) {
try {
Watcher childrenNodeChangeWatcher = event -> {
if(event.getType() == Watcher.Event.EventType.NodeChildrenChanged) { watchNode(zk); }}; String rootPath = zookeeperProperties.getRootPath(); List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher); List<String> nodes =new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(rootPath + "/" + node, false.null);
nodes.add(new String(bytes, Charset.forName("UTF-8")));
}
LOGGER.info("node data: {}", nodes);
this.nodes = nodes;
} catch (KeeperException | InterruptedException e) {
LOGGER.error("Node monitoring error, cause:", e); }}}Copy the code
The entry of this class is the constructor, which is used to get the address of ZK, and then get the node information of ZK. There is no service subscription here, that is, if there are two services on ZK and one fails, the client will not remove the information of the failed service, resulting in the call failure.
Then there’s ConsumerProxy, which is a proxy factory:
package com.glmapper.simple.consumer.proxy;
/**
* ConsumerProxy
*
* @author Jerry
*/
public class ConsumerProxy {
private ServiceDiscovery serviceDiscovery;
public ConsumerProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
@SuppressWarnings("unchecked")
public <T> T create(Class
interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
newClass<? >[]{interfaceClass},new SimpleInvocationHandler());
}
private class SimpleInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SimpleRequest request = buildRequest(method, args);
String serverAddress = getServerAddress();
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
if(response.getError() ! =null) {
throw new SimpleException("service invoker error,cause:", response.getError());
} else {
returnresponse.getResult(); }}private SimpleRequest buildRequest(Method method, Object[] args) {
SimpleRequest request = new SimpleRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
return request;
}
private String getServerAddress(a) {
String serverAddress = null;
if(serviceDiscovery ! =null) {
serverAddress = serviceDiscovery.discover();
}
if (null == serverAddress) {
throw new SimpleException("no server address available");
}
returnserverAddress; }}}Copy the code
There is an inner class SimpleInvocationHandler is the core of production agent, at the heart of the method is in SimpleInvocationHandler. Invoke () is called the two lines of code
ConsumerHandler consumerHandler = new ConsumerHandler(host, port);
SimpleResponse response = consumerHandler.send(request);
Copy the code
To initiate a web request, look at the ConsumerHandler class
package com.glmapper.simple.consumer.handler;
/** * RPC actually calls the client **@author Jerry
*/
public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);
private int port;
private String host;
private SimpleResponse response;
private CountDownLatch latch = new CountDownLatch(1);
public ConsumerHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {
this.response = response;
latch.countDown();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("client caught exception", cause);
ctx.close();
}
public SimpleResponse send(SimpleRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// Encode RPC requests (to send requests)
.addLast(new SimpleEncoder(SimpleRequest.class))
// Decode the RPC response (to process the response)
.addLast(new SimpleDecoder(SimpleResponse.class))
// Use RpcClient to send RPC requests
.addLast(ConsumerHandler.this); }}; bootstrap.group(group).channel(NioSocketChannel.class) .handler(channelHandler) .option(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
latch.await();
if(response ! =null) {
future.channel().closeFuture().sync();
}
return response;
} finally{ group.shutdownGracefully(); }}}Copy the code
This class is similar to the ProviderHandler code on the server side and is also a Netty communication class
GitHub address simple- RPC