Netty is an event-driven, asynchronous NIO network application framework that can be used to develop high performance, high reliability network IO programs. In the market, many RPC framework IO communication is realized by Netty. The main purpose of this project is to strengthen the grasp of Netty application through practice, and also to deepen the understanding of RPC through this project.
What is the RPC
RPC stands for Remote Procedure Call. Is a solution to cross-application invocation by establishing a TCP link between the client and server, sending requests to serialize and deserialize exchanged data, making it as easy for the caller as using a native API. People often wonder why they don’t use HTTP, which is actually a way to implement RPC.
The implementation process
Protocols Before we talk about the server and client implementations, we should define the relevant RPC protocols. SwiftMessage, message POJOs that flow in pipeline:
public class SwiftMessage {
private int length;
private byte[] content;
public SwiftMessage(int length, byte[] content) {
this.length = length;
this.content = content;
}
// omit the setter and getter
}
Copy the code
RpcRequest, RpcResponse, request response object processed in handler:
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 2104861261275175620L;
private String id;
private String className;
private String methodName;
private Object[] parameters;
privateClass<? >[] parameterTypes; }Copy the code
public class RpcResponse implements Serializable {
private static final long serialVersionUID = -1921327887856337850L;
private String requestId;
private int code;
private String errorMsg;
private Object data;
}
Copy the code
ByteBuf and SwiftMessage transform encoders and decoders:
public class SwiftMessageEncoder extends MessageToByteEncoder<SwiftMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, SwiftMessage msg, ByteBuf out) throws Exception { out.writeInt(msg.getLength()); out.writeBytes(msg.getContent()); }}Copy the code
public class SwiftMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
out.add(newSwiftMessage(length, content)); }}Copy the code
In fact, the Server RPC Server is simpler, only need to do the following things:
- Start the service and register yourself with the RPC service center (so that clients can find and establish connections)
@Component
public class SwiftServerRunner implements ApplicationRunner.ApplicationContextAware {
private void start(int port, String serverName) {
if (isRunning.get()) {
LOGGER.info("SwiftRPC service is already running");
return;
}
final NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
final NioEventLoopGroup workerGroup = new NioEventLoopGroup(16);
try {
SwiftServerHandler handler = new SwiftServerHandler(rpcServiceMap);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new SwiftMessageDecoder());
pipeline.addLast(newSwiftMessageEncoder()); pipeline.addLast(handler); }}); ChannelFuture channelFuture = bootstrap.bind(port).sync(); isRunning.set(true);
String rpcServer = "127.0.0.1:" + port;
registerRpcServer(serverName, rpcServer);
LOGGER.info("SwiftRPC service started, port: {}", port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
isRunning.set(false);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
throw new RuntimeException("Failed to start SwiftRPC service", e); }}private void registerRpcServer(String serverName, String rpcServer) throws Exception {
// Simply save the RPC Server list to Redis
redisTemplate.opsForHash().put("rpc-server", serverName, rpcServer); }}Copy the code
- Listen to the client connection, receive the request data, deserialize into RpcRequest
@Override
protected void channelRead0(ChannelHandlerContext ctx, SwiftMessage msg) throws Exception {
byte[] content = msg.getContent();
RpcRequest request = Kryos.deserialize(content, RpcRequest.class);
}
Copy the code
- Use the parameters in RpcRequest to perform the specific business implementation method through reflection, and write the results back into the connection to the client
@ChannelHandler.Sharable
public class SwiftServerHandler extends SimpleChannelInboundHandler<SwiftMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SwiftMessage msg) throws Exception {
byte[] content = msg.getContent();
RpcRequest request = Kryos.deserialize(content, RpcRequest.class);
RpcResponse response = new RpcResponse();
response.setRequestId(request.getId());
try {
Object result = handler(request);
response.setData(result);
} catch (Exception e) {
response.setCode(-1);
response.setErrorMsg(e.toString());
}
byte[] resContent = Kryos.serialize(response);
SwiftMessage message = new SwiftMessage(resContent.length, resContent);
ctx.writeAndFlush(message);
}
private Object handler(RpcRequest request) throws Exception {
String className = request.getClassName();
Object rpcService = rpcServiceMap.get(className);
if (rpcService == null) {
throw new RuntimeException("RPC service class not found:" + className);
}
// Invoke the business layer through reflectionString methodName = request.getMethodName(); Object[] parameters = request.getParameters(); Class<? >[] parameterTypes = request.getParameterTypes(); Class<? > clazz = rpcService.getClass(); Method method = clazz.getMethod(methodName, parameterTypes);returnmethod.invoke(rpcService, parameters); }}Copy the code
The client side is a little more complicated because the client needs to receive external invocation requests (such as those from a REST interface), select a different RPC server to send the request based on the request, and wait for the result to return.
- Register an RPC interface. To call a method of a Service by injection, you need to register the service as a Spring bean
@Autowired
private UserService userService;
Copy the code
However, the specific implementation class of UserService is on the server side, so you need to define a ComponentScan, scan all RPC interfaces, register into Spring beans, and set the corresponding FactoryBean to complete the instantiation work, you can refer to the implementation process of Mybatis. Dynamic proxies are used in FactoryBeans to generate proxy classes for each interface that execute each method. So the final RPC call is made in the invoke method of this proxy class.
public class SwiftRpcServiceRegister implements ImportBeanDefinitionRegistrar {
private static final Logger LOGGER = LoggerFactory.getLogger(SwiftRpcServiceRegister.class);
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(SwiftRpcServiceScan.class.getName());
if(annotationAttributes ! =null) {
String[] packages = (String[]) annotationAttributes.get("packages");
SwiftRpcClassPathBeanDefinitionScanner scanner = new SwiftRpcClassPathBeanDefinitionScanner(registry);
scanner.addIncludeFilter(new AnnotationTypeFilter(SwiftRpcService.class));
if (packages == null || packages.length == 0) {
LOGGER.warn("Scanned RPC directory is empty");
return;
}
Set<BeanDefinitionHolder> definitionHolders = scanner.doScan(packages);
for (BeanDefinitionHolder holder : definitionHolders) {
GenericBeanDefinition beanDefinition = (GenericBeanDefinition) holder.getBeanDefinition();
String className = beanDefinition.getBeanClassName();
if (className == null) {
continue;
}
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(className);
beanDefinition.setBeanClass(SwiftRpcFactoryBean.class);
beanDefinition.getPropertyValues().add("rpcInterface", className); beanDefinition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE); }}}static class SwiftRpcClassPathBeanDefinitionScanner extends ClassPathBeanDefinitionScanner {
SwiftRpcClassPathBeanDefinitionScanner(BeanDefinitionRegistry registry) {
super(registry, false);
}
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
AnnotationMetadata metadata = beanDefinition.getMetadata();
return metadata.isInterface() && metadata.isIndependent();
}
@Override
protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
return super.doScan(basePackages); }}}Copy the code
The requested invocation is performed through the proxy of the rpcInterface.
public class SwiftRpcFactory<T> implements InvocationHandler {
private Class<T> rpcInterface;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String className = rpcInterface.getName();
String methodName = method.getName();
SwiftRpcService rpcService = rpcInterface.getAnnotation(SwiftRpcService.class);
String rpcServer = rpcService.server();
RpcRequest request = new RpcRequest();
request.setId(UUID.randomUUID().toString());
request.setClassName(rpcInterface.getName());
request.setMethodName(methodName);
request.setParameters(args);
request.setParameterTypes(method.getParameterTypes());
SwiftClientRunner swiftClient = ApplicationContextHolder.getApplicationContext().getBean(SwiftClientRunner.class);
returnswiftClient.sendRequest(request, rpcServer); }}Copy the code
- Select the connection to the RPC Server, send the request, and wait for the result to return
public Object sendRequest(RpcRequest request, String rpcServer) throws InterruptedException, IOException {
Channel channel = getChannel(rpcServer);
if(channel ! =null && channel.isActive()) {
// Block after sending the request and wait for the result to return
SynchronousQueue<Object> queue = swiftClientHandler.sendRequest(request, channel);
return queue.poll(60, TimeUnit.SECONDS);
}
return null;
}
Copy the code
@Component
@ChannelHandler.Sharable
public class SwiftClientHandler extends SimpleChannelInboundHandler<SwiftMessage> {
private final ConcurrentHashMap<String, SynchronousQueue<Object>> requests = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SwiftMessage swiftMessage) throws Exception {
byte[] content = swiftMessage.getContent();
RpcResponse response = Kryos.deserialize(content, RpcResponse.class);
String requestId = response.getRequestId();
SynchronousQueue<Object> queue = requests.get(requestId);
queue.put(response.getData());
}
SynchronousQueue<Object> sendRequest(RpcRequest request, Channel channel) throws IOException {
SynchronousQueue<Object> queue = new SynchronousQueue<>();
requests.put(request.getId(), queue);
byte[] content = Kryos.serialize(request);
SwiftMessage message = new SwiftMessage(content.length, content);
channel.writeAndFlush(message);
returnqueue; }}Copy the code
conclusion
This is just a very simple RPC call, which is handled in many places, such as using redis hash storage instead of the service registry. However, this project will help you gain a comprehensive understanding of THE workflow of RPC and will be helpful in learning and using other RPC frameworks. The following is a summary of the workflow of the server and client: The server: starts the Netty service, registers the service information to the registry, receives the request and deserializes it into RpcRequest, executes the local business code through reflection, and writes the result back to the Socket. Client: scan the LIST of RPCInterfaces, register with the Spring container center, start the service, read the RPC Server address of the registry, establish a connection, make a remote call through the proxy, and wait for the result to return.
Project address: github.com/Phantom0103… Reference & Thanks: github.com/mybatis/spr… Juejin. Cn/post / 684490…