- instructions
- Project link
- What does a microservices framework include?
- How to implement RPC remote call?
- Open source RPC framework
- Qualified language
- Cross-language RPC framework
- Local Docker builds ZooKeeper
- Download mirror
- Start the container
- Viewing container Logs
- The RPC interface
- Netty RPC server
- Interface implementation
- The service start
- The registration service
- ZooKeeper implementation
- Netty RPC Client
- To create the agent
- The remote invocation
- codec
- RpcDecoder
- RpcEncoder
- RpcServerInboundHandler
- Path of the Server in ZooKeeper
- instructions
- Refer to the link
instructions
A microservices framework using Netty, ZooKeeper, and Spring Boot.
Project link
GitHub source address
What does a microservices framework include?
For details, see RPC Practice and Principles
Projects can be divided into caller (client) and provider (server). The client only needs to call the interface, and the final call information will be transmitted to the server through the network. The server invokes the corresponding method through decoding and reflection, and returns the result to the client through the network. The client can ignore the network completely and call the RPC service just as if it were a local method.
The model structure of the whole project is as follows:
How to implement RPC remote call?
- How do clients and servers establish network connections: HTTP or Socket
- How the server handles requests: NIO (using Netty)
- What protocol is used for data transmission
- How to serialize and deserialize data: JSON, PB, Thrift
Open source RPC framework
Qualified language
- Dubbo: Java, Ali
- Motan: Java, twitter
- Tars: C++, Tencent (multi-language support already)
- Spring Cloud: Java
- The gateway Zuul
- Registry Eureka
- Service timeout fuse Hystrix
- Call chain to monitor Sleuth
- Log analysis ELK
Cross-language RPC framework
- GRPC: HTTP / 2
- Thrift: TCP
Local Docker builds ZooKeeper
Download mirror
Start Docker and download the ZooKeeper image. See hub.docker.com/_/zookeeper
Start the container
The startup command is as follows, the container name is zookeeper-rpc-demo, and simultaneously exposes ports 8080, 2181, 2888, and 3888 to the local machine:
docker run --name zookeeper-rpc-demo --restart always -p 8080:8080 -p 2181:2181 -p 2888:2888 -p 3888:3888 -d zookeeper
Copy the code
This image includes EXPOSE 2181 2888 3888 8080 (the zookeeper client port, follower port, election port, AdminServer port respectively), so standard container linking will make it automatically available to the linked containers. Since the Zookeeper "fails fast" it's better to always restart it.
Copy the code
Viewing container Logs
You can enter the container by following the command, where fb6f95cde6ba is my Docker ZooKeeper container ID.
docker exec -it fb6f95cde6ba /bin/bash
Copy the code
In the container, go to the /apache-zookeeper-3.7.0-bin/bin directory and run the zkcli. sh -server 0.0.0.0:2181 command to connect to the ZK service.
The RPC interface
This example provides two interfaces: HelloService and HiService, which have one interface method respectively. The client only needs to refer to rpc-sample-API, knowing only the interface definition but not the specific implementation.
public interface HelloService {
String hello(String msg);
}
Copy the code
public interface HiService {
String hi(String msg);
}
Copy the code
Netty RPC server
Start a Server service, implement the above two RPC interfaces, and register the service with ZooKeeper.
Interface implementation
/ * * *@authorYano * making project: https://github.com/LjyYano/Thinking_in_Java_MindMapping *@dateThe 2021-05-07 * /
@RpcServer(cls = HelloService.class)
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
return "hello echo: "+ msg; }}Copy the code
/ * * *@authorYano * making project: https://github.com/LjyYano/Thinking_in_Java_MindMapping *@dateThe 2021-05-07 * /
@RpcServer(cls = HiService.class)
public class HiServiceImpl implements HiService {
public String hi(String msg) {
return "hi echo: "+ msg; }}Copy the code
There are two issues involved:
- Should the Server decide which interface implementations to register with ZooKeeper?
- What should the path of HelloServiceImpl and HiService look like in ZooKeeper?
The service start
This example Server uses Spring Boot, but we don’t need to start a Web service, just keep it running in the background, so set Web to WebApplicationType.None
@SpringBootApplication
public class RpcServerApplication {
public static void main(String[] args) {
newSpringApplicationBuilder(RpcServerApplication.class) .web(WebApplicationType.NONE) .run(args); }}Copy the code
The registration service
NettyApplicationContextAware is a ApplicationContextAware implementation class, program at startup, with RpcServer (will explain below) annotations implementation class to register to the ZooKeeper.
@Component
public class NettyApplicationContextAware implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(NettyApplicationContextAware.class);
@Value("${zk.address}")
private String zookeeperAddress;
@Value("${zk.port}")
private int zookeeperPort;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> rpcBeanMap = new HashMap<>();
for (Object object : applicationContext.getBeansWithAnnotation(RpcServer.class).values()) {
rpcBeanMap.put("/" + object.getClass().getAnnotation(RpcServer.class).cls().getName(), object);
}
try {
NettyServer.start(zookeeperAddress, zookeeperPort, rpcBeanMap);
} catch (Exception e) {
logger.error("register error !", e); }}}Copy the code
RpcServer annotations are defined as follows:
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.TYPE)
@Component
public @interface RpcServer {
/** * interface class, used for interface registration */Class<? > cls(); }Copy the code
ApplicationContext. GetBeansWithAnnotation (RpcServer. Class). Values () is to get project with RpcServer notes in class, and put it in a rpcBeanMap, Key is the path to be registered with ZooKeeper. Note that the path uses the name of the interface, not the name of the class.
The advantage of using annotations is that Server A can only provide HelloService, and Server B can only provide HiService, with no interaction and more flexibility.
Major in com.yano.server.Net tyServer# start service registry.
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
public static void start(String ip, int port, Map<String, Object> params) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
.addLast(new RpcDecoder(Request.class))
.addLast(new RpcEncoder(Response.class))
.addLast(newRpcServerInboundHandler(params)); }}); ChannelFuture future = serverBootstrap.bind(ip, port).sync();if (future.isSuccess()) {
params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));
}
future.channel().closeFuture().sync();
} finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
This class does the following:
- Start a Socket service through Netty and pass in the port number as a parameter
- Register the above interface implementation with ZooKeeper
params.keySet().forEach(key -> ZooKeeperOp.register(key, ip + ":" + port));
Copy the code
ZooKeeper implementation
The main task is to maintain the ZK connection and register the Server IP address and port to the corresponding ZooKeeper. The Ephemeral node is used so that after the Server goes offline and loses its connection, ZooKeeper automatically deletes the node so that the Client does not get the address of the offline Server.
public class ZooKeeperOp {
private static final String zkAddress = "localhost:2181";
private static final ZkClient zkClient = new ZkClient(zkAddress);
public static void register(String serviceName, String serviceAddress) {
if(! zkClient.exists(serviceName)) { zkClient.createPersistent(serviceName); } zkClient.createEphemeral(serviceName +"/" + serviceAddress);
System.out.printf("create node %s \n", serviceName + "/" + serviceAddress);
}
public static String discover(String serviceName) {
List<String> children = zkClient.getChildren(serviceName);
if (CollectionUtils.isEmpty(children)) {
return "";
}
returnchildren.get(ThreadLocalRandom.current().nextInt(children.size())); }}Copy the code
Netty RPC Client
The Netty RPC Client invokes the two interfaces in the same way as local methods.
public class RpcClientApplication {
public static void main(String[] args) {
HiService hiService = RpcProxy.create(HiService.class);
String msg = hiService.hi("msg");
System.out.println(msg);
HelloService helloService = RpcProxy.create(HelloService.class);
msg = helloService.hello("msg"); System.out.println(msg); }}Copy the code
Run the code above and the console will output:
hi echo: msg
hello echo: msg
Copy the code
To create the agent
HiService hiService = RpcProxy.create(HiService.class);
String msg = hiService.hi("msg");
Copy the code
The Client needs to create the proxy with com.yano.RpcProxy#create, after which it can call the hi method of hiService.
public class RpcProxy {
public static <T> T create(finalClass<? > cls) {
return (T) Proxy.newProxyInstance(cls.getClassLoader(), newClass<? >[] {cls}, (o, method, objects) -> { Request request =new Request();
request.setInterfaceName("/" + cls.getName());
request.setRequestId(UUID.randomUUID().toString());
request.setParameter(objects);
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
Response response = new NettyClient().client(request);
returnresponse.getResult(); }); }}Copy the code
In order for the Server side to be able to call the method requested by the Client side through reflection, it needs at least:
- The name of the class interfaceName
- The method name methodName
- Parameter type Class
[] parameterTypes - Object parameter[]
@Data
public class Request {
private String requestId;
private String interfaceName;
private String methodName;
privateClass<? >[] parameterTypes;private Object parameter[];
}
Copy the code
The remote invocation
It is ultimately called remotely through the following code, where Request contains all the information about the method being called.
Response response = new NettyClient().client(request);
Copy the code
/ * * *@authorYano * making project: https://github.com/LjyYano/Thinking_in_Java_MindMapping *@dateThe 2021-05-07 * /
public class NettyClient extends SimpleChannelInboundHandler<Response> {
private Response response;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) {
this.response = response;
}
public Response client(Request request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// Create and initialize the Netty client Bootstrap object
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new RpcDecoder(Response.class))
.addLast(new RpcEncoder(Request.class))
.addLast(NettyClient.this); }});// Connect to the RPC server
String[] discover = ZooKeeperOp.discover(request.getInterfaceName()).split(":");
ChannelFuture future = bootstrap.connect(discover[0], Integer.parseInt(discover[1])).sync();
// Write RPC request data and close the connection
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
return response;
} finally{ group.shutdownGracefully(); }}}Copy the code
This code is the core and does two things:
- Request ZooKeeper to find the Server address of the corresponding node. If there are multiple service providers, Zookeeperop. Discover randomly returns the Server address
- Establish a Socket connection with the obtained Server address, request and wait for the return
codec
channel.pipeline()
.addLast(new RpcDecoder(Response.class))
.addLast(new RpcEncoder(Request.class))
.addLast(NettyClient.this);
Copy the code
Both the Client and Server need to encode and decode Request and Response. This example uses the simplest Json format. Netty message codec codes are as follows.
RpcDecoder
RpcDecoder is a ChannelInboundHandler that decodes Response on the Client side.
public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
private finalClass<? > genericClass;public RpcDecoder(Class
genericClass) {
this.genericClass = genericClass;
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
if (msg.readableBytes() < 4) {
return;
}
msg.markReaderIndex();
int dataLength = msg.readInt();
if (msg.readableBytes() < dataLength) {
msg.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength]; msg.readBytes(data); out.add(JSON.parseObject(data, genericClass)); }}Copy the code
RpcEncoder
RpcEncoder is a ChannelOutboundHandler that encodes Request on the Client side.
public class RpcEncoder extends MessageToByteEncoder {
private finalClass<? > genericClass;public RpcEncoder(Class
genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
if (genericClass.isInstance(msg)) {
byte[] data = JSON.toJSONBytes(msg); out.writeInt(data.length); out.writeBytes(data); }}}Copy the code
RpcServerInboundHandler
This is the core of the Server reflection call, which is explained separately here. Netty Server has added RpcServerInboundHandler to the pipeline at startup.
socketChannel.pipeline()
.addLast(new RpcDecoder(Request.class))
.addLast(new RpcEncoder(Response.class))
.addLast(new RpcServerInboundHandler(params));
Copy the code
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
logger.info("request data {}", JSON.toJSONString(request));
// JDK reflection calls
Object bean = handle.get(request.getInterfaceName());
Method method = bean.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
method.setAccessible(true);
Object result = method.invoke(bean, request.getParameter());
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setResult(result);
// The client automatically closes the connection after receiving the message
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
Copy the code
Path of the Server in ZooKeeper
After the Server is started, the following output is displayed:
There are two lines of log:
The create node/com. Yano. Service. HelloService / 127.0.0.1:3000 create node/com. Yano. Service. HiService / 127.0.0.1:3000Copy the code
Check the node in ZooKeeper and find that the service has been registered.
[zk: 0.0.0.0:2181 (CONNECTED) 0] ls/com. Yano. Service. HelloService [127.0.0.1:3000] [zk: 0.0.0.0:2181 (CONNECTED) 1] ls/com. Yano. Service. HiService [127.0.0.1:3000]Copy the code
instructions
A microservice RPC framework using Netty, ZooKeeper, and Spring Boot. This demo should only be used as an example, it can be implemented manually for better understanding and should not be used in production.
This article can be found at GitHub source address. Welcome star and fork.
Refer to the link
Github.com/yanzhenyida…