Hands-on implementation of RPC
Recently, I realized a simple RPC demo by using my time. In this process, I encountered some problems and also gained some things. I would like to share this process.
preparation
RPC is common and familiar in enterprise development, but for developing an RPC, we need to master some basic theoretical knowledge.
RPC
Network communication (communication between processes) is slower than single-process communication- What the network transmits is
byte
Rather thancharacter
Or some other transmission medium. - Understand or grasp
The use of dynamic proxies
.
The essence is to transmit information, I (the consumer) know the information to tell you (the server) through communication, and then you give me back the final information.
implement
With that in mind, let’s take a look at how we use RPC. The service provider needs to expose the service, and the service consumer needs to access the required service. This process is shown below, and my talk will revolve around it.
Server exposes service –> consumer references service –> dynamic proxy –> serialization –> network request –> server processing –> serialization return –> Consumer Returns result.
Services available
Service exposure is to publish the service to (ZooKeeper) and make it available to other consumers. When consumers get this information, they can connect to the server and initiate a request. Exposure is the need to publish the basic information of the service. The main information listed here includes service machine host service listening port service interface service serialization method service weight service version and other codes as follows
1. Information exposed by the service
public class Provider implements Serializable.Cloneable {
private String serviceName;
private String host;
private Integer port;
private String version;
private Integer weight;
private String serialization;
get and set
}
Copy the code
2. The process of exposing services, taking ZooKeeper as an example, is to add services to ZK in the form of nodes
@Override
public void registerService(List<Provider> providerList) {
assertzkClient ! =null;
providerList.parallelStream().forEach(provider -> {
String host = provider.getHost();
Integer port = provider.getPort();
String serviceName = provider.getServiceName();
String version = provider.getVersion();
Integer weight = provider.getWeight();
String serverPath = root_path + "/" + serviceName + root_provider;
if(! zkClient.exists(serverPath)) { zkClient.createPersistent(serverPath,true);
}
String finalInfo = host + split + port + split + serviceName + split + version + split + weight + split + provider.getSerialization();
String path = serverPath + "/" + finalInfo;
if(! zkClient.exists(path)) { log.info("Register service :{} to ZooKeeper", serverPath);
zkClient.createEphemeral(path);
} else {
log.warn("Service :{} has been registered", serverPath); }}); }Copy the code
You can then run the zkCli command to check the status of services on the ZK.
[zk: localhost:2181(CONNECTED) 0] ls /fuck/top.huzhurong.fuck.UserService/provider
[]
Copy the code
The consumer side references services and dynamic proxies
The consumer-to-server approach is mostly introduced through Spring’s custom tags. The usage is as follows.
<fuck:reference id="test" interface="top.huzhurong.fuck.UserService" version="0.0.1"/>
Copy the code
Thus references to the top. Huzhurong. Fuck. UserService this version 0.0.1 service. When using Spring, we need to use dynamic proxies to access the network and invoke interfaces on the service, so it is not possible to configure beans like ordinary beans. Generally, factoryBeans are configured. Because beans can be customized in getObject (most frameworks are also introduced by FactoryBeans).
Factorybeans can be used in the following ways
public class ProxyBean implements FactoryBean.InitializingBean {
private Class name;
private Object object;
@Override
public Object getObject(a) {
return object;
}
@Override
publicClass<? > getObjectType() {return name;
}
@Override
public void afterPropertiesSet(a) {
this.build();
}
private void build(a) {
// Dynamic proxy generates a proxy bean
this.object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{this.name}, (proxy, method, args) -> {
// Customize our service in Invoke, go TCP/HTTP, etc
if (method.getName().equalsIgnoreCase("name")) {
return "Call the name method";
}
if (method.getName().equalsIgnoreCase("toString")) {
return "Call toString method";
}
return "111"; }); }}Copy the code
This is just a simple procedure to illustrate the service reference, as follows
- Register the consumer node with ZooKeeper
- Gets a list of servers
- Subscribe to the service interface
- Setting up dynamic Proxy
I don’t have enough space, so I’m going to cover setting up dynamic agents. The other three are pretty simple.
Object object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{Class.forName(this.interfaceName)}
, new FuckRpcInvocationHandler(this));
class FuckRpcInvocationHandler implements InvocationHandler {
/ / field
// constructor
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// Get the list of services
List<Provider> all = ProviderSet.getAll(this.className);
// Soft load balancing
Provider provider = loadBalance.getProvider(all);
String info = provider.buildIfno();
SocketChannel channel = ChannelMap.get(info);
if (channel == null) {
// Establish a TCP connection
Client client = new NettyClient(provider, this.serialization);
client.connect(provider.getHost(), provider.getPort());
channel = ChannelMap.get(info);
}
SocketChannel finalChannel = channel;
Future<Response> submit = TempResultSet.executorService.submit(() -> {
// Request to write to the TCP channel
finalChannel.writeAndFlush(request);
// This latch can be CountDownLatch. Circulation is better
for(; ;) { Response response = TempResultSet.get(request.getRequestId());if(response ! =null) {
returnresponse; }}}); Response response = submit.get(this.timeout, TimeUnit.SECONDS); }}Copy the code
Dynamic proxy is mainly to obtain the list of services, software load handling, establish TCP connections, communication steps.
The dynamic proxy only enters the Invoke method when it is actually called, not instantiated, and every proxy call goes to Invoke
Serialization and network requests
There are a lot of articles on serialization, I only support Protostuff and JDK serialization. It is very difficult itself, is still OK after encapsulation, focus on the processing of network transmission
After solving the serialization, is the network transmission, we all know that the network transmission is byte, but how to use the network transparent transmission we are very headache, and Netty in use to reduce our entry difficulty, its simple API can be quickly used, if you have not passed, See Netty’s example to get started.
In the processing of network transmission part, one is the processing of protocol, and the other is how to assign the result of the processed request to the correct request object.
1, the protocol processing is also very simple, is a regular length + data, as a custom protocol, this processing can let us fast processing without entangling the correctness of the protocol
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
if (byteBuf.readableBytes() <= HEAD_LENGTH) {
return;
}
byteBuf.markReaderIndex();// Mark the location
int dataLength = byteBuf.readInt();
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();// There is not enough data to read
return;
}
byte[] dataArray = new byte[dataLength];
byteBuf.readBytes(dataArray);
Request request = serialization.deSerialize(dataArray, Request.class);
if (log.isDebugEnabled()) {
log.debug("Received consumer request :{}, request content :{}", ctx.channel().toString(), request);
}
list.add(request);
}
Copy the code
2, how to return the result of the processing to the correct request object, we all know that when executing a request, we need an RI –> requestId to mark the request, even in our RPC, to ensure that the request and response is a family
public class Request implements Serializable {
private String requestId;// Request identification
private String serviceName;// Service name
private String methodName;// Method name, Method cannot be serialized
privateClass<? >[] parameters;// Parameter type, used to get the execution method of the
private Object[] args;// Actual parameters
}
Copy the code
3, service processing, you can choose to create a new thread pool rather than directly using Netty’s Work IO thread pool.
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
if (serializable instanceof Request) {
Request request = (Request) serializable;
responseTask.execute(new ResponseTask(request, channelHandlerContext, this.applicationContext)); }}@Override
public void run(a) { String serviceName = request.getServiceName(); String methodName = request.getMethodName(); Class<? >[] parameters = request.getParameters(); Object[] args = request.getArgs(); Response response =new Response();
response.setRequestId(request.getRequestId());
response.setSuccess(false);
try {
// Get the service
Object service = ServiceCache.getService(serviceName);
if (service == null) { Class<? > aClass = ClassUtils.forName(serviceName, ClassUtils.getDefaultClassLoader()); service = applicationContext.getBean(aClass); ServiceCache.put(serviceName, service); }// Get method
Method method = service.getClass().getDeclaredMethod(methodName, parameters);
Object invoke = method.invoke(service, args);
System.out.println("invoke:" + invoke);
response.setSuccess(true);
response.setObject(invoke);
} catch (ClassNotFoundException | IllegalAccessException e) {
response.setException(e);
} catch (NoSuchMethodException e) {
e.printStackTrace();
response.setException(e);
} catch (InvocationTargetException e) {
e.printStackTrace();
response.setException(e.getTargetException());
}
// Write to channel
channelHandlerContext.writeAndFlush(response);
}
Copy the code
The consumer returns the result
When the server writes data, if the network is open, it basically goes to the client, and we simply put it into a Map, and the consuming thread keeps looking for the data in the Map
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
if (serializable instanceof Response) {
Response response = (Response) serializable;
/ / write a mapTempResultSet.put(response.getRequestId(), response); }}// Loop write
for(; ;) { Response response = TempResultSet.get(request.getRequestId());if(response ! =null) {
returnresponse; }}Copy the code
At this point a simple RPC call is ready to go.
summary
A simple RPC is fresh, but in fact there are many points that can be modified, such as interception (chain of responsibility +SPI), such as the use of observer mode, but also learned a basic RPC use method, but also need to understand the use of thread pool, the use of here is also very crude. But I was happy to write it in a few days, and the project is here —-> simple implementation of fuck- RPC, you can also try it out
2018-12-05