Netty responsibility chain design pattern

Design patterns

The chain of responsibility pattern creates a chain of processing objects for the request

The process of initiating the request is decoupled from the process of processing the request: the handler on the responsibility chain is responsible for processing the request, and the customer only needs to send the request to the responsibility chain without caring about the processing details and delivery of the request.

Implement the chain of responsibility model

There are four elements to realize the chain of responsibility mode:

  1. Processor abstract class
  2. A concrete processor implementation class
  3. Save processor information
  4. Processing performed

Collection storage – examples of pseudocode

// Create a processor abstract class
class AbstractHandler{
    void doHandler(Object args); 
}

// Processor-specific implementation class
class Handler1 extends AbstractHandler {
    void doHandler(Object args){
        //handler}}class Handler2 extends AbstractHandler {
    void doHandler(Object args){
        //handler}}class Handler3 extends AbstractHandler {
    void doHandler(Object args){
        //handler}}// Create a collection and store all processor instance information
List handlers = new ArrayList<AbstractHandler>()
handlers.add(new Handler1())
handlers.add(new Handler2())
handlers.add(new Handler3())

// Call the handler to process the request
void process(request) {
    for(AbstractHandler handler:handlers) { handler.doHandler(request); }}// Initiate a request and process the request through the chain of responsibility
call.process(request)
Copy the code

Tomcat uses this chain of responsibility model

Linked list calls – pseudocode examples

// The processor abstract class
class AbstractHandler {
    AbstractHandler next;
    void doHandler(Object args);
}

// Processor-specific implementation class
class Handler1 extends AbstractHandler {
    void doHandler(Object args){
        //handler}}class Handler2 extends AbstractHandler {
    void doHandler(Object args){
        //handler}}class Handler3 extends AbstractHandler {
    void doHandler(Object args){
        //handler}}// Store the processor in a linked list
pipeline = start [new Handler1() -> new Handler2() -> new Handler3()] end

// Process the request, calling the handler from scratch
void process(request) {
    handler = pipeline.findOne;
    while(handler ! =null){ handler.doHandler(request); handler = handler.next(); }}Copy the code

Netty is the use of this chain of responsibility pattern, so according to the structure of the analysis of Netty chain call, and the last one except for the processor save form is not different.

Custom responsibility chain mode

Now that we have written the chain store pseudo-code and want to use it to clarify the netty call rules, we can copy this method in the simplest code for subsequent understanding, as follows:

/ * * *@author daniel
 * @version 1.0.0
 * @date2021/12/12 * /
public class PipelineDemo {

    public static void main(String[] args) {
        AbstractHandler handler1 = new Handler1();
        AbstractHandler handler2 = new Handler2();
        AbstractHandler handler3 = new Handler3();

        handler1.setNextHandler(handler2);
        handler2.setNextHandler(handler3);

        HandlerChainContext handlerChainContext = new HandlerChainContext(handler1);
        handlerChainContext.startRun("Ding Dai Guang"); }}/** * processor context, which is responsible for maintaining the list and the execution of the processors in the list */
class HandlerChainContext {

    AbstractHandler currentHandler;

    /** * Initializes the responsibility header *@paramCurrentHandler Responsibility header */
    public HandlerChainContext(AbstractHandler currentHandler) {
        this.currentHandler = currentHandler;
    }

    /** * responsibility chain call entry *@paramArgs requests information */
    public void startRun(Object args) {
        currentHandler.doHandler(this, args);
    }

    /** * executes the next processor *@paramThe args parameter * /
    public void runNext(Object args) {
        AbstractHandler nextHandler = currentHandler.getNextHandler();
        currentHandler = nextHandler;
        if (nextHandler == null) {
            System.out.println("The end");
            return;
        }
        nextHandler.doHandler(this, args); }}/** * defines the processor abstract class */
abstract class AbstractHandler {

    AbstractHandler nextHandler;

    public AbstractHandler getNextHandler(a) {
        return nextHandler;
    }

    public void setNextHandler(AbstractHandler nextHandler) {
        this.nextHandler = nextHandler;
    }

    abstract void doHandler(HandlerChainContext handlerChainContext, Object args);
}

/** * Processor implementation class 1 */
class Handler1 extends AbstractHandler {

    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object args) {
        args = args.toString() + " --- handler1";
        System.out.println("It has been processed by the Handler1 handler and the result is :" + args);
        // Execute the next processorhandlerChainContext.runNext(args); }}class Handler2 extends AbstractHandler {

    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object args) {
        args = args.toString() + " --- handler2";
        System.out.println("It has been processed by the Handler2 processor and the result is :" + args);
        // Execute the next processorhandlerChainContext.runNext(args); }}class Handler3 extends AbstractHandler {

    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object args) {
        args = args.toString() + " --- handler3";
        System.out.println("It has been processed by the Handler3 processor, and the result is :" + args);
        // Execute the next processorhandlerChainContext.runNext(args); }}Copy the code

The result is as follows:

Handler1 has been processed by the Handler1 processor. Result: Dimlight -- Handler1 has been processed by the Handler2 processor. Result: Dimlight -- Handler2 Has been processed by the Handler3 processor, processing result: DDL -- Handler1 -- Handler2 -- Handler3 endCopy the code

This is strictly in accordance with the above pseudo-code implementation of a simple list of storage responsibility chain model, of course, NetTY Chinese medicine is more complex than this is more secure, although simple but the understanding of Netty responsibility chain model must be of great benefit.

ChannelPipeline responsibility chain in Netty

  1. In Netty, the Pipeline pipe holds all processor information for the channel.

  2. This pipeline is a proprietary pipeline that is automatically created when a Channel is created using the initAndRegister method on the bind port as follows:

    channel = channelFactory.newChannel();
    Copy the code

    ++ Through debug and the above can be clearly known to create NioServerSocketChanel, so you can directly check its constructor ++, as follows:

    /** * Create a new instance */
    public NioServerSocketChannel(a) {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    Copy the code
    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    Copy the code

    < AbstractNioChannel > < AbstractNioChannel >

    /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);  // Set to non-blocking mode
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            }
    
            throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code

    This method sets niO to non-blocking mode and is already on track to call the NIO framework provided by the JDK, continuing with the constructor ++ of its parent class AbstractChannel, as follows:

    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline(); // create Pipeline
    }
    Copy the code

    The newChannelPipeline method ++ is the newChannelPipeline method:

    /**
     * Returns a new {@link DefaultChannelPipeline} instance.
     */
    protected DefaultChannelPipeline newChannelPipeline(a) {
        return new DefaultChannelPipeline(this); // Create a DefaultChannelPipeline object, which is the pipeline object, responsible for link management
    }
    Copy the code
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        tail = new TailContext(this);  // Create the end of the list
        head = new HeadContext(this);  // Create the linked list header
    
        // There is no processor in the middle, so far only end to end
        head.next = tail;
        tail.prev = head;
    }
    Copy the code

    Now it looks a lot like a simple custom demo, except DefaultChannelPipeline doesn’t have a single handler in it, and of course not,HeadContext and TailContext are both processors, just a special handler

    What other processors can be on this link let’s take a look at……

  3. Both inbound events (write) and outbound operations (read) invoke handlers on the pipeline.

The next thing we need to do is:

  1. What do inbound events and outbound operations mean?
  2. What does the Handler do?
  3. How does Pipeline maintain handlers?
  4. What is the execution flow of handler?

Inbound events and outbound events

An inbound event is typically an I/O thread that generates inbound data, such as EventLoop receiving selector OP_READ, and the inbound handler calling socketchannel.read (ByteBuffer) to receive the data, which results in the channel’s ChannelPipeline The channelRead method in the next contained is called. Basically, the socket has data coming in from the lower level and automatically ADAPTS the corresponding inbound processor to handle it.

NioData -> Netty YinHandler: The Netty layer processes data

An outbound event is usually when the I/O thread performs an actual output operation, such as the bind method, which is meant to request the server The socket is bound to the given SocketAddress, which causes the bind method in the next outbound handler contained in the ChannelPipeline to be called, basically manually calling the appropriate outbound handler to handle a data sent to the underlying socket logic.

NettyOutHandler -> NIO: The Netty layer sends data to the lower layer for processing

For these two types of events, Netty provides more specific events as follows:

  • Inbound events (inbound)
    • FireChannelRegistered: channel registration event
    • FireChannelUnregistered: channel in addition to register event
    • FireChannelActive: channel active events
    • FireChannelInactive: the channel is not active
    • FireExceptionCaught: indicates an abnormal event
    • FireUserEventTriggered: User-defined event
    • FireChannelRead: read event channel
    • Complete event fireChannelReadComplete: channel to read
    • FireChannelWritabilityChanged: channel state change events
  • Outbound events
    • Bind: indicates the port binding event
    • Connect: indicates the connection event
    • Disconnect: Indicates the disconnection event
    • Close: the event is closed
    • Derigister: Contact registration events
    • Flush: Flushes data to network events
    • Read: Read event, used to register OP_READ to selector
    • Write: write events
    • WriteAndFlush: Write data events

Event description so far, after viewing the source code to understand the event mechanism in detail, from the beginning to the end in addition to the event in this article is the most common is the processor, also know how to store the Netty processor (pipeline), so in addition to the head of the HeadContext and TailContext processor also What other processors are there?

What is a handler in a Pipeline?

All handlers are derived from a processor-level ChannelHandler interface that handles IO events and intercepts IO events and forwards them to the next handler in the ChannelPipeline, as follows:

package io.netty.channel;

import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
public interface ChannelHandler {
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value}}Copy the code

As you can see, the top-level interface definition is very weak. In practice, it implements two major subinterfaces:

  • ChannelInboundHandler: Processes inbound I/O events
  • ChannelOutboundHandler: Processes outbound I/O events

This corresponds to the event above, meaning that the event is used in the handler.

Netty provides some simple classes for implementing the inbound and outbound interfaces:

  • ChannelInboundHandlerAdapter: handle the inbound IO events
  • ChannelOutboundHandlerAdapter: handle the inbound IO events
  • ChannelDuplexHandler: Supports simultaneous processing of inbound and outbound events

== As with the custom chain of responsibility pattern, the actual object stored in the Pipeline in Netty is not ChannelHandler, but the context object ChannelHandlerContext, which wraps the handler in the context object and passes it through the Chann to which it belongs ElPipeline interactions, passing events up or down or modifying pipelines, are done through this context object.==

Now that you know what the handler is and what inbound and outbound events are executed in the handler, how does Netty maintain a whole ChannelHandler link?

How to maintain handlers in Pipeline

One premise is that a ChannelPipeline is thread-safe, which means that a ChannelHandler can be added, removed, or replaced at any time. The usual operation is to add a ChannelHandler to the pipeline during initialization Rich API for Pipeline management handlers (looking ahead in Defau… Pipeline), as follows:

  • AddFirst: Inserts first
  • AddLast: Insert at the end
  • AddBefore: Inserts before the specified handler
  • AddAfter: Inserts after the specified handler
  • Remove: Removes the specified processor
  • RemoveFirst: Removes the first processor
  • RemoveLast: Removes the last processor
  • Replace: replaces the specified processor

Said so much from the theory has solved three problems, first look at the source… Otherwise it’s all theoretical… Of course, read the source code also did not egg with so many API directly used, just to fool XX interviewer…

Processor + event source

The signature has already looked at the code of the top-level interface. There are very few codes and very few methods.

ChannelInboundHandler

* /package io.netty.channel;

/ * * * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    void channelActive(ChannelHandlerContext ctx) throws Exception;

    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
Copy the code

ChannelOutboundHandler

package io.netty.channel;

import java.net.SocketAddress;

/ * * * {@link ChannelHandler} which will get notified for IO-outbound-operations.
 */
public interface ChannelOutboundHandler extends ChannelHandler {
    
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}
Copy the code

The two major subinterfaces that extend from the top-level interface ChannelHandler define methods for inbound and outbound events, respectively. There is no special requirement that these events cover all the events we use in our development.

None of these interfaces will do us much good unless we have to customize the processor, but we need to know what they can do. In most cases, we should use Netty’s easy-to-develop adapters, as follows:

ChannelInboundHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception { ctx.fireExceptionCaught(cause); }}Copy the code

ChannelOutboundHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;

import java.net.SocketAddress;

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }}Copy the code

ChannelDuplexHandler

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
Copy the code

As you can see, these three adapters have already helped us implement the interface, so we just need to use it. Even if we need to enhance events, we only need to inherit from these three adapters to enhance a single method, and there is no need to implement the top-level interface to implement all methods manually.

From the three adapters, you can see the ChannelHandlerContext class throughout this article, which acts as a context and maintains the entire processor link, which is simply an interface. We can analyze its a default implementation class DefaultChannelHandlerContext, as follows:

DefaultChannelHandlerContext

package io.netty.channel;

import io.netty.util.concurrent.EventExecutor;

/** * The default context implementation class */
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;  // The handler that needs to be executed is stored in the context

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler; // The handler passed in when the context object is created
    }

    @Override
    public ChannelHandler handler(a) {
        returnhandler; }}Copy the code

As with custom responsibility chains, store a handler in context, execute the handler in context, and suddenly see something wrong… Isn’t it an implementation class? Interface? What about the hierarchy of processor links? Don’t try so hard to see his father class AbstractChannelHandlerContext

AbstractChannelHandlerContext

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext.ResourceLeakHint {
    volatile AbstractChannelHandlerContext next; // Next processor
    volatile AbstractChannelHandlerContext prev; // The previous processor
}
Copy the code

It comes, you can see AbstractChannelHandlerContext ChannelHandlerContext interface is achieved, and there are two variables next and prev used to maintain link parent-child relationships

== raises a question, shouldn’t a single context object be able to maintain an entire link? Why is the context object store used here? = =

+ + behind a guess wrong words again to correct, we are in the custom in processor storage under the summary using the next one processor, and the processor in the netty, each processor is not any other processor design, so you need to use multiple context object to deal with the relationship between before and after a link, this is a guess, not necessarily for… ++

The default Pipeline implementation class DefaultChannelPipeline implements the ChannelPipeline interface, so there must be API implementation of the processor link operation, as follows:

/** * Default pipe implementation class */
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head; / / object
    final AbstractChannelHandlerContext tail; / / object
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);  // Create the end of the list
        head = new HeadContext(this);  // Create the linked list header

        // There is no processor in the middle, so far only end to end
        head.next = tail;
        tail.prev = head;
    }
    
    @Override
    public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
        return addFirst(null, name, handler);
    }
    
    @Override
    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            name = filterName(name, handler);

            newCtx = newContext(group, name, handler); // Create a new context object

            addFirst0(newCtx); // Actually handle the context procedure

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if(! registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx,true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    
    /** * The actual process of adding context to the first link *@paramNewCtx Newly created processor context object */
    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;  // The current first
        newCtx.prev = head; // Start with the first one
        newCtx.next = nextCtx; // The last one is the current first one
        head.next = newCtx; // Next to the header is the new context object
        nextCtx.prev = newCtx; // The current first is preceded by the newly created context object}}Copy the code

It can be seen that the pipeline initializes the header and tail, but it is not a processor but a context object. In THE API, the processor is also encapsulated as a context object for processing, so our guess is correct, i.e A context pipeline object stores two special context objects. These two context objects can be used to string together a string of context objects (prev and next), which are actually connected to the processor. This has the advantage of any processor Object can be used anywhere, because the handler object does not contain any information designed for other processors, it is pure, and encapsulates a context object when used elsewhere, wonderful……

Put the previous three questions together and draw a picture as follows:

So the first three problems are solved both theoretically and from the source side, then analyze the handler execution process is what

Handler execution analysis

As shown in the figure above, even if there are multiple processors in the Pipeline (both inbound and outbound), the Pipeline will automatically choose not to use an outbound handler for an inbound event (125) and not to use an inbound handler for an outbound event (521) when executing a specific event.

In the context object that encapsulates the processor, methods starting with fire represent the propagation and processing of inbound events, and the remaining methods represent the propagation and processing of outbound events.

Analyze processing of registered inbound events

Registered events represent the binding of channels and selectors in NIO, and in NetTY represent the binding of channels and eventloops, such as the bind method

First of all, a Channel is a proprietary pipeline that is automatically created when a Channel is created. It is used in the initAndRegister method. After the Channel is created, the Channel is initialized using the init method in the ServerBootStrap as follows:

ServerBootstrap

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline(); // Get the pipeline object in the channel

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    finalEntry<ChannelOption<? >, Object>[] currentChildOptions = newOptionsArray(childOptions);finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = newAttributesArray(childAttrs);ChannelInitializer is a special handler that initializes the channel, executes it once, and then destroys it
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) { // When ChannelInitializer is triggered, the initChannel method is executed after receiving a successful registration event
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if(handler ! =null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run(a) {
                    pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

Before adding the ChannelInitializer handler, == Pipeline only had two handlers in the responsibility chain,HeadContext and TailContext (HeadContext->TailContext), which was later added via p.addlast () A processor ChannelInitializer== looks like this:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {}
Copy the code

First can see ChannelInitializer is an inbound event handler, we focus on initChannel from the processor, channelRegistered and handlerAdded, as follows:

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    / / if handlerAdded (...). The channelRegistered method is executed, and the channelRegistered method should theoretically not be called again
    if (initChannel(ctx)) { // This handler is removed after the initChannel method is called
        // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
        // miss an event.
        ctx.pipeline().fireChannelRegistered();

        // We are done with init the Channel, removing all the state for the Channel now.
        removeState(ctx);
    } else {
        // Called initChannel(...) before which is the expected behavior, so just forward the event.ctx.fireChannelRegistered(); }}/ * * * {@inheritDoc} If override this method ensure you call super! * /
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) { // Call the initialization method if the current channel is already registered
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        if (initChannel(ctx)) {

            // We are done with init the Channel, removing the initializer now.
            removeState(ctx); // Remove the handler}}}/** * Channel initialization */
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel()); // This init method is usually the same initChannel method that was implemented when the channel was created
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...) .
            // We do so to prevent multiple calls to initChannel(...) .
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this); //ChannelInitializer removes itself from the pipeline after execution to avoid repeated initialization}}return true;
    }
    return false;
}
Copy the code

The channelRegistered method has a comment worth noting if handlerAdd(…) Once executed, the channelRegistered method should theoretically never be called again. Why?

If you look at the handlerAdd method, you’ll see that if the channel has been initialized it removes the handler from the chain of responsibility, and the channelRegistered method above really won’t be called again.

Either channelRegistered or handlerAdd executes initChannel(ChannelHandlerContext), which calls initChannel((C) ctx.channel()); This method is the code that we override when we execute p.addlast (), and can also be found in initChannel(ChannelHandlerContext) to remove the current handler. Pipeline.remove (this)

From the analysis of ChannelInitializer, we can also realize the magic of dynamic addition and deletion of processors in the responsibility chain

The handler in the responsibility chain has changed since the p.addlast () method was executed. That is, HeadContext->ChannelInitializer ->TailContext . = =

This completes the init() method in initAndRegister() issuance

After we check the register method, we already know the register method in front end of the actual call is AbstractChannel. Register, as follows:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //eventLoop --> selector
    / / promise - > channel
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if(! isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    // Determine whether the thread calling the bound method is a selector thread, if not submitted as a thread
    if (eventLoop.inEventLoop()) {
        // The actual binding process
        register0(promise);
    } else {
        try {
            // Once a task submission is triggered,eventLoop will start polling
            eventLoop.execute(new Runnable() {
                @Override
                public void run(a) { register0(promise); }}); }catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}}Copy the code

The AbstractChannel. This. EventLoop = eventLoop; Eventloop.ineventloop () = EventLoop(); EventLoop() = EventLoop();

eventLoop.execute(new Runnable() {
    @Override
    public void run(a) { register0(promise); }});Copy the code

To be clear, EventLoop does not have threads. Threads are created through the thread creator only when the task is executed. We won’t go into details, but we’ll focus on register0(Promise) as follows:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();  // In NioChannel, bind a Channel to a NioEventLoop -- essentially a code call like NIO

        // Registration is complete


        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded(); // Trigger the HandlerAdd method -- here he comes

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();  // Propagate channel registration completion event
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
        // Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code

Registration completed before code don’t need to elaborate, now we study pipeline. InvokeHandlerAddedIfNeeded (); , start from here channel began to formal execution in a processor events, this is a HandlerAdd events, currently we have 3 processors, only the second processor have corresponding execution method, so it must be in the execution of the second processor method, it’s a way to go back, the bottom layer of the code we don’t need to care about, the debug in dot points You can find it.

We’ll focus on the execution of the second handler, initChannel(CTX) with the handlerAdded method, and removeState(CTX) after execution. , so the execution of the pipeline. InvokeHandlerAddedIfNeeded (); Then there are only two processors left in the pipeline, HeadContext and TailContext, but is this really the case?

Executing initChannel(CTX) in the handlerAdded method executes the abstract method we implemented when we added the handler. We seem to have been forgetting what the abstract method of the handler we added is. Take a closer look:

@Override
public void initChannel(final Channel ch) { // When ChannelInitializer is triggered, the initChannel method is executed after receiving a successful registration event
    final ChannelPipeline pipeline = ch.pipeline(); / / object of pipeline
    ChannelHandler handler = config.handler(); // Read the processor from the configuration
    if(handler ! =null) {
        pipeline.addLast(handler); // The processor in the configuration is not empty
    }

    ch.eventLoop().execute(new Runnable() {  // Finally add a task to the ServerBootstrapAcceptor processor
        @Override
        public void run(a) {
            pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }Copy the code

What is config.handler()? . This is the configuration we added before executing the Bing method, as follows:

b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); }});Copy the code

A handler LoggingHandler must be present in the responsibility chain after executing the initChannel method, and LoggingHandler inherits from the ChannelDuplexHandler, which is an aggregation handler, either for outbound events or for outbound events Inbound events execute methods in this handler, no doubt, this is a logging.

Finally, a processor, ServerBootstrapAcceptor, was added.

. = = that is performed pipeline invokeHandlerAddedIfNeeded (); After that, there are currently 4 handlers in the chain, namely HeadContext->LoggingHandler->ServerBootstrapAcceptor->TailContext, of course there should be 5 handlers before removing the original handler.==

However, the ServerBootstrapAcceptor handler is not found in the chain of responsibility when the method execution completes… This is… Embarrassed, keep looking no ==

After the execution of the method is pipeline fireChannelRegistered (); Used to propagate the channel registration completion event, this event is simple, traversing all handlers to execute the Register event until the handler does not call the next handler, which is TailContext.

The notification of the registration event is asynchronous. After the task is submitted, another thread is processing the event

At this point the Register inbound event is done, followed by the BIND outbound event.

A handler that parses bind outbound events

The code that actually performs port binding is as follows:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run(a) {  // Submit a task to NioEventLoop for port binding
            // The registration is complete and successful
            if (regFuture.isSuccess()) {
                / / binding
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

This method will publish the registration task asynchronously to the EventLoop, focusing on channel.bind(…). Call, actually call abtractchannel.bind as follows:

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    // Trigger a binding event in the Netty responsibility chain, which is initiated by the application layer code to the bottom layer and belongs to outBound
    return pipeline.bind(localAddress, promise);
}
Copy the code

Pipeline. Bind (localAddress, promise) : bind(localAddress, promise) :

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
Copy the code

Whether you can see through tail began to call the bind of the outbound event, this is one of the characteristics of the outbound event, began to call in from the tail, if not rewrite the bind event method will call AbstractChannelHandlerContext. The bind method, as follows:

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); // Find the next outbound event handler
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise); // Event execution
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run(a) {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null.false);
    }
    return promise;
}
Copy the code

Look for the next outbound handler, actually look for the previous outbound handler, this is the other way around, and if it finds it it calls next. InvokeBind (…). Method to execute, as follows:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); // Actually call the bind method in the handler
        } catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}else{ bind(localAddress, promise); }}Copy the code

In the chain of responsibility above, the bind method in LoggingHandler must be called. It is up to the code in the handler to continue the search until no bind outbound event is propagated, in this case the bind method in HeadContext, as follows:

@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
Copy the code

Unsafe is an unsafe word. Its constructor is as follows:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, HeadContext.class);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}
Copy the code

Unsafe is the unsafe object of a channel, and the abstractchannel. bind method is called as follows:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceofInetSocketAddress && ! ((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && ! PlatformDependent.isWindows() && ! PlatformDependent.maybeSuperUser()) {// Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if(! wasActive && isActive()) { invokeLater(new Runnable() {
            @Override
            public void run(a) { pipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }Copy the code

We just need to focus on doBind(localAddress); We know that NioServerSocketChannel is the channel, so we can directly find the corresponding doBind method, as follows:

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else{ javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code

At last, we called the underlying logic of JDK-NIO for port binding.

At this point, the server is ready to use.

ServerBootstrapAcceptor (accept) : accept handler (accept handler) : accept handler (accept handler) : accept handler (accept handler) : accept handler (accept handler)

Analyze the processing of the accept inbound event

By now, the server has been started, and the EventLoop has polling events, and when the EventLoop polls for an Accept event, it will start accepting the propagation of inbound events. This is a pre-condition. It should be noted that the selector is exactly the same as the NIO selector Just encapsulation.

NioEventLoop: run (); NioEventLoop: run ();

private void processSelectedKeys(a) {
    if(selectedKeys ! =null) {
        processSelectedKeysOptimized();
    } else {
        // Polling with events in the selector generated when the NioEventLoop is initialized
        // But this selector will only work if it is bound to the channel (it must be bound when these events are executed)
        // Reads the event from the selector bound to the channelprocessSelectedKeysPlain(selector.selectedKeys()); }}Copy the code
/** * handle events *@paramSelectedKeys event collection */
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // check if the set is empty and if so just return to not create garbage by
    // creating a new Iterator every time even if there is nothing to process.
    // See https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) { // No events
        return;
    }

    // Iterate over the query result
    Iterator<SelectionKey> i = selectedKeys.iterator();
    / / polling
    for (;;) {
        // Encapsulated events
        final SelectionKey k = i.next();
        // Get the channel corresponding to the event
        final Object a = k.attachment();
        i.remove(); // Remove the event from the check

        if (a instanceof AbstractNioChannel) { // Check whether it is a Netty channel
            // Handle a single event
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if(! i.hasNext()) {break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else{ i = selectedKeys.iterator(); }}}}Copy the code
/** * handle a single event *@param k
 * @param ch
 */
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //k --> encapsulated events
    Ch --> get the netty channel corresponding to the event
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

    if(! k.isValid()) {// Check whether the event is valid (if not, enter this logic to turn off unsafe).
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();  // Get niO event executor
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps(); // Get the event type
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if((readyOps & SelectionKey.OP_CONNECT) ! =0) { // The connect event is primarily used by the client
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if((readyOps & SelectionKey.OP_WRITE) ! =0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            // Write events are handled here
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
            // Accept and read are processed here (server focus)unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

The unsafe.read() method is the most important for handling joins, since unsafe is the unsafe channel itself, so we’ll go straight to niomessage.read (). The methods are as follows:

@Override
public void read(a) {
    assert eventLoop(a).inEventLoop(a);
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if(exception ! =null) {
            closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if(isOpen()) { close(voidPromise()); }}}finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
Copy the code

The first call we see from the method is int localRead = doReadMessages(readBuf); , we need to focus on the read method, this method only in AbstractNioMessageChannle abstract method is given, and then I can be directly to find its subclasses NioServerSocketChannel, as follows:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel()); // Get the SocketChannel object in the JDK from the connection

    try {
        if(ch ! =null) {
            buf.add(new NioSocketChannel(this, ch)); // encapsulate the SocketChannel object in the JDK as the NioSocketChannel used by netty
            return 1; }}catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2); }}return 0;
}
Copy the code

As you can see, the only thing this method does is transition the NIO in the JDK to Netty, encapsulating it as a NIO SocketChannel, and this is the first time that it involves a client channel, which was all around nio ServerSocketChannel.

Now that a new connection has been acquired, you can iterate over the connection to propagate the read inbound event as follows:

for (int i = 0; i < size; i ++) { // Iterate over the new connections read
    readPending = false;
    pipeline.fireChannelRead(readBuf.get(i)); // Start the responsibility chain call, scattering the read inbound event
}
Copy the code

ServerBootstrapAcceptor (); ServerBootstrapAcceptor ();

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg; MSG is the new SocketChannel received by accept

    child.pipeline().addLast(childHandler); // Add a new handler. This handler is the ChannelInitializer processor configured for our server startup

    setChannelOptions(child, childOptions, logger); // Channel Settings
    setAttributes(child, childAttrs);

    try { // Channel register, select a nioEventLoop from subEventLoopGroup to handle IO operation, bind channel to EventLoop, trigger register event and active event, then automatically register OP_READ(same as main)
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}Copy the code

This method is used to distribute, and main receives the new connection (accept) and distributes it to sub to execute the subsequent logic (read).

By now, the handler in the server pipeline should understand how the server receives new connections and how they are distributed once they are received. Should a read event be polled by the EventLoop in childGroup? Let’s see

Analyze the handling of read inbound events

With the analysis of the above three events, we should summarize, what is the criterion of the whole link analysis?

  1. What event is this?
  2. What handlers are currently in the pipeline at the time the event is executed or dispatched?
  3. Which ones are triggered?
  4. What is the order of execution?
  5. What handlers are left after the event is executed, and are there any additions or deletions?

Unsafe.read (), the unsafe.read() event, the unsafe.read() event, the Eventloop in sub, uses AbstractNioByteChann to initialize itself El, which calls its read method, looks like this:

@Override
    public final void read(a) { // The client sends a request to read the specific data
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator); // Request a byte buffer
                allocHandle.lastBytesRead(doReadBytes(byteBuf)); // Read specific data into the byte buffer
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf); // Distribute the read contents to the responsibility chain for processing by the handler
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if(close) { closeOnRead(pipeline); }}catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if(! readPending && ! config.isAutoRead()) { removeReadOp(); }}}}Copy the code

Like the other read method, the other one is for reading the connection and this is for reading the content, and then the chain of responsibility takes care of the specific data it reads.

We now know that it is an inbound read event, and that the pipeline has three handlers (read the registration logic at the end of the Accept event if you want to know exactly when it was added). , respectively is HeadContext EchoServerHandler (custom) and TailContext, these three processors will be triggered, is performed by the HeadContext started, so we only need to pay attention to the custom EchoServerHandler went, as follows :

package io.netty.example.echo;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/** * Handler implementation for the echo server. */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.cause.printStackTrace(); ctx.close(); }}Copy the code

Write back when the read event is triggered, which triggers the write outbound event.

Custom processor

Now that we understand the call flow, we can customize three processors to execute the business logic in practice:

  • Protocol decoder: Converts byte arrays into Java objects for processing
  • Business logic processor: Processes Java objects
  • Protocol encoder: Returns Java objects encoded as arrays of bytes

Web address: http://175.24.172.160:3000/#/netty/responsibilityChainDesignMode