This is the 27th day of my participation in Gwen Challenge

One, foreword

Previously, we have analyzed Netty server startup.

To review the Netty server startup process, see the following figure:

Then after the Netty server is fully started, it can be used for external services.

The following aspects are analyzed:

  1. How does the server establish a new connection to the client?
  2. How does the server read messages from the client?
  3. How does the server send the response message to the client after processing the request?




Second, direct hate source code

The source code will follow the same process:

  1. How does the server establish a new connection to the client?
  2. How does the server read messages from the client?
  3. How does the server send the response message to the client after processing the request?

(1) How does the server establish a new connection with the client?

Process, as shown in figure:

NettyThere are four main steps for the server to process the new connection created by the client:

  1. Boss NioEventLoopThe thread polls the client for new connectionsOP_ACCEPTThe event
  2. structureNettyThe clientNioSocketChannel
  3. registeredNettyThe clientNioSocketChannelWorkerIn worker thread
  4. registeredOP_READThe event to theNioSocketChannelThe event set of

The Boss NioEventLoop in Netty is responsible for receiving new connections. Boss NioEventLoop listens for the OP_ACCEPT event when a client has a new connection to the server.

// Location: io.netty.channel.nio
public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run(a) {
        for (;;) {
            / /... .processSelectedKeys(); }}private void processSelectedKeys(a) {
        if(selectedKeys ! =null) {
            processSelectedKeysOptimized();
        } else{ processSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(a) {
        for (int i = 0; i < selectedKeys.size; ++i) {
            / /... .
            processSelectedKey(k, (AbstractNioChannel) a);
            / /... .}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            if((readyOps & SelectionKey.OP_WRITE) ! =0) {
                ch.unsafe().forceFlush();
            }
            Important: / /
            if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 
                || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}}Copy the code

Unsafe.read (); How is it handled?

As you can see, the core logic of the read() method is to keep reading data through a while loop and then putting it into a List, where the data is actually a new connection.

// Location: io.netty.channel.nio
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

    / / inner classes
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
        private final List<Object> readBuf = new ArrayList<Object>();
        @Override
        public void read(a) {
            / /... .
            try {
                try {
                    do {
                        // The while loop keeps reading from the Buffer
                        int localRead = doReadMessages(readBuf);
                        / /... .
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                // Triggers channelRead event propagation
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                / /... .
            } finally {
                / /... .}}}}Copy the code

Take a closer look at the doReadMessages() method for NioServerSocketChannel.

/ / location: io.net ty. Channel. Socket. Nio
public class NioServerSocketChannel extends AbstractNioMessageChannel
    implements io.netty.channel.socket.ServerSocketChannel {
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        // 1. Get the JDK native SocketChannel from the underlying JDK accept()
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if(ch ! =null) {
                Encapsulate it as Netty's own NioSocketChannel
                // 2.1. Create core member variables id, unsafe, pipeline
                Register the selectionkey. OP_READ event
                // 2.3. Set Channel to non-blocking mode; Create a client Channel configuration
                buf.add(new NioSocketChannel(this, ch));
                return 1; }}catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2); }}return 0; }}Copy the code

After the client NioSocketChannel is successfully constructed, channelRead event propagation is then triggered via pipeline.FireChannelRead ().

A special handler, ServerBootstrapAcceptor, comes into play here.

ChannelRead events would spread to ServerBootstrapAcceptor. ChannelRead () method, channelRead () will the client Channel allocation to the worker thread to perform in the group.

The concrete implementation is as follows:

// Location: io.netty.bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap.ServerChannel> {
    / / inner classes
    private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            // Add childHandler to client Channel
            // childHandler is specified by the user in the startup class with the childHandler() method
            child.pipeline().addLast(childHandler);

            / /... .

            try {
                // Register a client Channel
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}}}Copy the code

ServerBootstrapAcceptor completes the third and fourth steps by registering NioSocketChannel with the Worker thread via the ChildGroup.register () method. And registers the OP_READ event to the NioSocketChannel’s event collection.

The interesting thing is that in the process of registration it calls the pipeline. FireChannelRegistered channelRegistered event () method propagation, Then call pipeline. FireChannelActive spread channelActive event () method.

The readIfIsAutoRead() method, which registers the selectionKey.op_read event into the Channel’s event collection.




(2) How does the server read the message from the client?

The process is shown as follows:

Find this code is very convoluted, difficult to locate, then how to do?

In this way, I can create a breakpoint in the handler that I wrote, and then the client can access it. At this time, I can break the IDEA step by step (F8).

Tip:

  1. The receiving connection isNioMessageUnsafe, reading data isNioByteUnsafe
  2. The receiving connection isbossReactorThread, reading data isworkReactorthread

The server reads the client message in two main steps:

  1. Read the client request data
  2. Delivered to thepipeline

The code is as follows:

// Location: io.netty.channel.nio
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    / / inner classes
    protected class NioByteUnsafe extends AbstractNioUnsafe {
                @Override
        public final void read(a) {
            / /... .
            try {
                do {
                    1. Read data
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    / /... .
                    
                    // 2. Important
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                / /... .
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
               / /... .}}}}Copy the code

To read the details, the code is as follows:

/ / location:
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
    implements ChannelHandlerContext.ResourceLeakHint {
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    @Override
    public ChannelHandlerContext read(a) {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        next.invokeRead();
        
        / /... .
        return this;
    }

    // Implement:
    private void invokeRead(a) {
        if (invokeHandler()) {
            try {
                // A user-defined handler is called here
                ((ChannelOutboundHandler) handler()).read(this);
            } catch(Throwable t) { notifyHandlerException(t); }}else{ read(); }}}Copy the code




(3) How to send the response message to the client after the server finishes processing the request?

The process is shown as follows:

This is relatively easy to do by looking directly at the code written back in handler:

// This is written in
ChannelHandlerContext ctx;
ctx.write(responseBuffer);
Copy the code

It can be divided into four steps:

  1. Processing response dataByteBuf
  2. Put the response data into the response cache
  3. NioEventLoopread
  4. chooseSocketChannel


  1. Processing response dataByteBuf
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
    implements ChannelHandlerContext.ResourceLeakHint {
    
        @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        / /... .
        write(msg, false, promise);

        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        Important: / /
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        / /... .
        // 
        next.invokeWriteAndFlush(m, promise);
        / /... .
    }
    
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else{ writeAndFlush(msg, promise); }}private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}}Copy the code
  1. Put the response data into the response cache
// Custom handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    // After that, it goes here
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }}// Location: io.netty.channel
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext.ResourceLeakHint {
    @Override
    public ChannelHandlerContext flush(a) {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        next.invokeFlush();
        / /... .}}// Then slowly enter, you will find:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public final void flush(a) {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
        // Write cache
        outboundBuffer.addFlush();
        flush0();
    }
    
    protected void flush0(a) {
        // Perform real write
        // abstract methods that actually go through NioSocketChanneldoWrite(outboundBuffer); }}Copy the code
  1. NioEventLoop read

  2. Choose a SocketChannel

/ / location: io.net ty. Channel. Socket. Nio
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        / /... .
        
        // Find the corresponding client channelSocketChannel ch = javaChannel(); }}Copy the code