1, the background
A lightweight distributed RPC framework that uses Zookeeper, Netty, and Spring to write a lightweight distributed RPC framework. Spent some time looking at his code, it is clean and simple, the RPC framework can be considered a simple version of Dubbo. This RPC framework is small, but sparrow is small, all the five organs, interested can learn.
I added the following features to this simplified version of RPC:
- Support for asynchronous invocation of services, and support for callback functions
- Clients use long connections (shared connections in multiple calls)
- The server asynchronously multithreaded RPC requests
The address of the project: https://github.com/luxiaoxun/NettyRpc
2, introduction,
RPC, or Remote Procedure Call, invokes services on a Remote computer as if they were local. RPC can decouple the system well. For example, WebService is a KIND of RPC based on Http protocol.
The overall RPC framework is as follows:
Some of the techniques used by this RPC framework address the following issues:
Service publishing and subscription: The server uses Zookeeper to register service addresses, and the client obtains available service addresses from Zookeeper.
Communication: Use Netty as the communication framework.
Spring: Use Spring to configure services, load beans, and scan annotations.
Dynamic proxy: Clients use proxy mode to transparently invoke services.
Message codec: Serialize and deserialize messages using Protostuff.
3. The server publishes the service
Use annotations to annotate the service to be published
Service annotations
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { Class<? > value(); }Copy the code
A service interface:
public interface HelloService {
String hello(String name);
String hello(Person person);
}Copy the code
A service implementation: annotate with annotations
@RpcService(HelloService.class) public class HelloServiceImpl implements HelloService { @Override public String hello(String name) { return "Hello! " + name; } @Override public String hello(Person person) { return "Hello! " + person.getFirstName() + " " + person.getLastName(); }}Copy the code
The service scans all service interfaces and their implementations at startup:
@Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName(); handlerMap.put(interfaceName, serviceBean); }}}Copy the code
Register service address on Zookeeper cluster:
public class ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
public ServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
AddRootNode(zk); // Add root node if not exist
createNode(zk, data);
}
}
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException e) {
LOGGER.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
}
return zk;
}
private void AddRootNode(ZooKeeper zk){
try {
Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
if (s == null) {
zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
LOGGER.error(e.toString());
} catch (InterruptedException e) {
LOGGER.error(e.toString());
}
}
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException e) {
LOGGER.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
}
}
}
ServiceRegistryCopy the code
Here, AddRootNode() is added on the basis of the original text to judge whether the service parent node exists. If not, a PERSISTENT service parent node is added. In this way, although there is some judgment when starting the service, there is no need to manually command to add the service parent node.
For details about how to use Zookeeper, see Basic Zookeeper Principles.
4. The client invokes the service
Invoking a service using proxy mode:
public class RpcProxy { private String serverAddress; private ServiceDiscovery serviceDiscovery; public RpcProxy(String serverAddress) { this.serverAddress = serverAddress; } public RpcProxy(ServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } @SuppressWarnings("unchecked") public <T> T create(Class<? > interfaceClass) { return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<? >[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); if (serviceDiscovery ! = null) { serverAddress = serviceDiscovery.discover(); } if(serverAddress ! = null){ String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); RpcClient client = new RpcClient(host, port); RpcResponse response = client.send(request); if (response.isError()) { throw new RuntimeException("Response error.",new Throwable(response.getError())); } else { return response.getResult(); } } else{ throw new RuntimeException("No server address found!" ); }}}); }}Copy the code
In this case, the proxy is used to remotely invoke the service each time, obtain the available service address from Zookeeper, send a Request through RpcClient, and wait for the Response of the Request. Obj wait and notifyAll are used to wait for Response to return, which leads to “suspended animation” : When a Request is sent, the Response may be returned before obj.wait() is called. In channelRead0, the Response has been received and obJ.notifyAll () has been called before obj.wait(). Obj.wait () is used to wait in suspended animation. CountDownLatch can solve this problem.
Note: the connection to the server is set up only when send is called. Short connections are used, which can cause connection count problems in high concurrency and affect performance.
Obtain the service address from Zookeeper:
public class ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); private CountDownLatch latch = new CountDownLatch(1); private volatile List<String> dataList = new ArrayList<>(); private String registryAddress; public ServiceDiscovery(String registryAddress) { this.registryAddress = registryAddress; ZooKeeper zk = connectServer(); if (zk ! = null) { watchNode(zk); } } public String discover() { String data = null; int size = dataList.size(); if (size > 0) { if (size == 1) { data = dataList.get(0); LOGGER.debug("using only data: {}", data); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); LOGGER.debug("using random data: {}", data); } } return data; } private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); }}}); latch.await(); } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); }}}); List<String> dataList = new ArrayList<>(); for (String node : nodeList) { byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } LOGGER.debug("node data: {}", dataList); this.dataList = dataList; } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } } } ServiceDiscoveryCopy the code
Each time the service address node changes, you need to watchNode again to get a new service address list.
5. Message encoding
Request message:
public class RpcRequest { private String requestId; private String className; private String methodName; private Class<? >[] parameterTypes; private Object[] parameters; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<? >[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<? >[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } } RpcRequestCopy the code
Response message:
public class RpcResponse { private String requestId; private String error; private Object result; public boolean isError() { return error ! = null; } public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } } RpcResponseCopy the code
Message serialization and deserialization tools :(based on Protostuff implementation)
public class SerializationUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
private static Objenesis objenesis = new ObjenesisStd(true);
private SerializationUtil() {
}
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
if (schema != null) {
cachedSchema.put(cls, schema);
}
}
return schema;
}
/**
* 序列化(对象 -> 字节数组)
*/
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化(字节数组 -> 对象)
*/
public static <T> T deserialize(byte[] data, Class<T> cls) {
try {
T message = (T) objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
SerializationUtilCopy the code
Because the processing is TCP message, I add TCP sticky packet processing Handler
Channel. Pipeline (.) addLast (new LengthFieldBasedFrameDecoder,0,4,0,0 (65536))Copy the code
The first four bytes indicate the length of the message. That is, when the message is encoded, the length of the message is written before the message is written.
6. Performance improvement
1) The server requests asynchronous processing
Netty itself is a high performance network framework, from the network IO aspect is not too big a problem.
From the RPC framework itself, on the basis of the original text, the Server side processing request process changed to multi-thread asynchronous:
public void channelRead0(final ChannelHandlerContext ctx,final RpcRequest request) throws Exception { RpcServer.submit(new Runnable() { @Override public void run() { LOGGER.debug("Receive request " + request.getRequestId()); RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Object result = handle(request); response.setResult(result); } catch (Throwable t) { response.setError(t.toString()); LOGGER.error("RPC Server handle request error",t); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { LOGGER.debug("Send response for request " + request.getRequestId()); }}); }}); }Copy the code
Handler processing in Netty 4 If the I/O thread performs time-consuming operations, such as database operations, the I/O thread waits, affecting performance.
2) Long connection management of the server
The client maintains a long connection with the service, which does not need to be connected every time the service is invoked. Long connection management (Obtains valid addresses from Zookeeper).
Dynamically updates the long connection between the client and server by monitoring the change of the Value of the Zookeeper service node. This is now done on the client side, which maintains a long connection to all available services, putting pressure on both the client and the server to decouple the implementation.
3) The client requests asynchronous processing
The client requests support for asynchronous processing without synchronous waiting: send an asynchronous request, return the Feature, and retrieve the result through the Feature’s callback mechanism.
IAsyncObjectProxy client = rpcClient.createAsync(HelloService.class);
RPCFuture helloFuture = client.call("hello", Integer.toString(i));
String result = (String) helloFuture.get(3000, TimeUnit.MILLISECONDS);Copy the code
In my opinion, the items to be improved in RPC are as follows:
Encoding serialization multi-protocol support.
The project is constantly updated.
The address of the project: https://github.com/luxiaoxun/NettyRpc
Reference:
- Lightweight distributed RPC framework: http://my.oschina.net/huangyong/blog/361751
- You should know the principle of RPC: http://www.cnblogs.com/LBSer/p/4853234.html