This is the seventh day of my participation in the More text Challenge. For details, see more text Challenge

This article is participating in the “Java Theme Month – Java Development in action”. See the link to the event for more details


Netty is a high-performance network transport framework. As a basic communication component, It is widely used by RPC framework. For example, it is used in the Dubbo protocol for inter-node communication, and the Avro component in Hadoop for data file sharing. So let’s try to implement a simple RPC framework using Netty.

First, we abstracted a service API interface, the service provider implemented the methods in this interface, and the service consumer directly invoked the interface for access:

public interface TestService {
    String test(String message);
}
Copy the code

The server implements this interface for the consumer to invoke:

public class TestServiceImpl implements TestService {
    @Override
    public String test(String message) {
        System.out.println("Server has received:"+ message);
        if(message ! =null) {return "hi client, Server has Received:["+ message+"]";
        }else{
            return "empty message"; }}}Copy the code

Then we start to create the Server side of the service using Netty:

public class NettyServer {
    public static void startServer(String hostname,int port){
        EventLoopGroup bossGroup=new NioEventLoopGroup(1);
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(newNettyServerHandler()); }}); ChannelFuture future = serverBootstrap.bind(hostname, port).sync(); System.out.println("Server start");
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

When creating the Server side, we added Netty’s own String encoder and decoder in ChannelPipeline, and finally added our business logic processing handler.

Just as Dubbo uses its own Dubbo protocol in the invocation, we need to customize our protocol before invoking the service. If we receive a message that is not according to our defined protocol, we will not handle it. Here we define a simple protocol that specifies what our message begins with:

public class Protocol {
    public static final String HEADER="My#Protolcol#Header#";
}
Copy the code

Create a handler on the server to process business logic. To create a new class inherits ChannelInboundHandlerAdapter, through channelRead method receives the client sends the message to determine whether the message in the methods begin with our custom protocol header, if have read messages, and call the local method, Finally, writeAndFlush returns the result of the call.

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg="+msg);
        if(msg.toString().startsWith(Protocol.HEADER)){
            String result = new TestServiceImpl().test(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); }}@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

At this point, we have finished writing the server and can start writing the client. Because the client-side code is a little special, we write the NettyClientHandler that handles the business logic first, and then implement the Netty initialization methods on the client.

In the handler, we want to use multithreading to invoke the service side, using channelRead receiving server returned results, so in addition to inheriting ChannelInboundHandlerAdapter parent class, also implement Callable interface, and rewrite the method call.

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    private ChannelHandlerContext context;
    // Return the result
    private String result;
    // The parameter passed when the client calls the method
    private String param;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        // Wake up the waiting thread
        notify();
    }

    @Override
    public synchronized Object call(a) throws Exception {
        context.writeAndFlush(param);
        wait();
        return result;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public void setParam(String param) {
        this.param = param; }}Copy the code

In the above code, the variable context is created to store the ChannelHandlerContext of the current handler so that it can be used to send messages in the call method. When a connection to the server is created, the channelActive method is first executed to assign a value to the context.

Note that the synchronized keyword of the call method and the synchronized keyword is very important. When the wait method is executed, the lock is released, so that the channelRead method obtains the lock. After reading the message returned by the server, the notify thread is used to wake up the call method. Return the result.

Having said NettyClientHandler, let’s go back to writing the Netty client startup class, NettyClient. First, we create a pool of threads that will be used to perform access requests later, the size of which is defined as the number of threads available to our CPU.

private static ExecutorService executor 
      = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Copy the code

Since the client is calling the interface, we need to create the proxy object using the proxy pattern. We create a getProxy method to get the proxy object and enhance the method:

public Object getProxy(finalClass<? > serviceClass,final String protocolHead) {
    return Proxy.newProxyInstance(this.getClass().getClassLoader(), newClass<? >[]{serviceClass},new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (clientHandler == null) {
                initClient();
            }
            clientHandler.setParam(protocolHead + args[0]);
            returnexecutor.submit(clientHandler).get(); }}); }Copy the code

The submit method of the thread pool is called to submit the task, and the call method in the handler is called to send the request. The args[0] method is used to initialize Netty’s client.

private static void initClient(a) {
    clientHandler = new NettyClientHandler();
    NioEventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(newStringEncoder()); pipeline.addLast(clientHandler); }}); bootstrap.connect("127.0.0.1".7000).sync();
        System.out.println("Client Startup");
    } catch(Exception e) { e.printStackTrace(); }}Copy the code

NettyClient’s ChannelPipeline also adds the codec, and our own implementation of the business logic handler.

At this point, the client and server functions are complete. We create a startup class to start the server first:

public class ProviderBootstrap {
    public static void main(String[] args) {
        NettyServer.startServer("127.0.0.1".7000); }}Copy the code

To restart the client:

public class ConsumerBootstrap {
    public static void main(String[] args) {
        NettyClient consumer = new NettyClient();
        TestService proxy =(TestService) consumer.getProxy(TestService.class, Protocol.HEADER);
        String result = proxy.test("hi,i am client");
        System.out.println("result: "+result); }}Copy the code

Finally, to look at the results, first look at the service provider:

The received message starts with our protocol, and the body of the message is obtained after the protocol header is removed, which is passed to the requesting method as the parameter of the RPC calling method. On the service consumer side:

A message from the service provider was received. Thus, a simple RPC framework has been implemented.


The last

If you think it helps you, friends can like, retweet ~ thank you very much

Wechat search: add a friend on the code, like friends can be oh ~

Reply “interview”, “map”, “structure” and “actual combat” in the background of the public account, and get free information