Interpretation of the Pipeline

Let’s look at the interface design of Pipeline first:

The Invoker actually specifies the outbound and inbound events that can be passed to the pipeline. The Pipeline itself is like a container, which provides many methods to add, delete, change and check the stored elements in the container, as well as some modifications to the Invoker inherited methods (return value).

For ChannelPipeline, it provides a number of methods to add, delete, change and check the ChannelHandlerContext. In fact, we also know from the start process, the pipeline is not stored in the handler itself, but the handler encapsulation. DefaultChannelHandlerContext, it is the real key

It also overwrites the methods inherited from Invoker, changing the return value to ChannelPipeline for our convenience

    /** 

We rewrite the method we get from Invoker and modify the return value to make it easier to use

**/
@Override ChannelPipeline fireChannelRegistered(a); @Override ChannelPipeline fireChannelUnregistered(a); @Override ChannelPipeline fireChannelActive(a); @Override ChannelPipeline fireChannelInactive(a); @Override ChannelPipeline fireExceptionCaught(Throwable cause); @Override ChannelPipeline fireUserEventTriggered(Object event); @Override ChannelPipeline fireChannelRead(Object msg); @Override ChannelPipeline fireChannelReadComplete(a); @Override ChannelPipeline fireChannelWritabilityChanged(a); @Override ChannelPipeline flush(a); Copy the code

So, now look at the handlers stored in the pipe

ChannelHandler

// The top-level interface defines two callback methods related to handler addition and removal
// The callbacks to these two methods are actually performed by the EventLoop corresponding to the Channel.
public interface ChannelHandler {

    /**
     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
     */
    // This method can be used to do some operations related to the release of resources
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     *
     * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
     * implement the method there.
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    /**
     * Indicates that the same instance of the annotated {@link ChannelHandler}
     * can be added to one or more {@link ChannelPipeline}s multiple times
     * without a race condition.
     * <p>
     * If this annotation is not specified, you have to create a new handler
     * instance every time you add it to a pipeline because it has unshared
     * state such as member variables.
     * <p>
     * This annotation is provided for documentation purpose, just like
     * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
     */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value}}Copy the code

Its encapsulation corresponds to the top-level interface

AbstractChannelHandlerContext

attribute

// We focus on the propagation mechanism of events in pipeline
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext.ResourceLeakHint {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
    / / after flooding
    volatile AbstractChannelHandlerContext next;
    / / precursor
    volatile AbstractChannelHandlerContext prev;

    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

    / * * * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
     */
    // Add wait state - handlerAdded has not been executed, possibly because the Channel has not been initialized
    private static final int ADD_PENDING = 1;
    / * * * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
     */
    // It has been added to the pipe - the corresponding handlerAdded method has been executed
    private static final int ADD_COMPLETE = 2;
    / * * * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    // Removed from pipeline - The corresponding handlerRemoved has been removed
    private static final int REMOVE_COMPLETE = 3;
    /**
     * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
     * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    // This is also the default state - the newly created handler
    private static final int INIT = 0;
    // Outer pipe container
    private final DefaultChannelPipeline pipeline;
    CTX is generated by default when added to pipeline
    private final String name;
    // Usually true
    private final boolean ordered;
    // A mask value associated with the method overridden by handler
    private final int executionMask;

    // Will be set to null if no child executor should be used, otherwise it will be set to the
    // child executor.
    // If the handler operation takes a long time, you can specify the group to execute
    final EventExecutor executor;
    private ChannelFuture succeededFuture;

    // Lazily instantiated tasks used to trigger events to a handler with different executor.
    // There is no need to make this volatile as at worse it will just create a few more instances then needed.
    private Tasks invokeTasks;
    // The default is INIT
    private volatile int handlerState = INIT;
Copy the code

As can be seen from the attributes, the storage in pipeline is actually a bidirectional linked list, corresponding to next and PREV

For CTX, there are several states periodically: INIT, ADD_PENDING, ADD_COMPLETE, and REMOVE_COMPLETE

If a channel is not yet registered, its executor is null, and the CTX is ADD_PENDING. If a channel is already registered, The corresponding state is ADD_COMPLETE, and the CTX state is REMOVE_COMPLETE when the handler is removed or the channel is closed

structure

In fact, we have been curious about how to transmit the event in pipeline, not so urgent, how to slowly expose its mask

How does CTX find the next CTX that can take care of the event when it propagates? Does this actually involve some kind of identifier?

    // Pipeline outer container, containing CTX container
    // Execute NioEventLoop for handels that take a long time to execute
    // The name of the handle in pipeline
    // Parameter 4: the processor class that actually handles the business
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        // Handler specifies the actual class
        // The mask method is used to calculate a mask. The mask is used to find the next suitable CTX when CTX is passed backwards
        // Return value - The final mask is recorded in the handler to override the method, which identifies its corresponding mask
        this.executionMask = mask(handlerClass);
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        // Usually true
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
Copy the code

The mask() method is key because the return value is assigned to executionMask, which is an execution-related mask identifier, as we know from the literal

The argument passes in the current handlerClass. In fact, by now, we know that the underlying implementation, whenever we design class-related, will inevitably use reflection. So where is reflection here?

# ChannelHandlerMask.mask()

A new class is involved: ChannelHandlerMask

    // Handler specifies the actual class
    // The mask method is used to calculate a mask. The mask is used to find the next suitable CTX when CTX is passed backwards
    static int mask(Class<? extends ChannelHandler> clazz) {
        // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
        // lookup in the future.
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) { // We consider this branch - no caching
            // Return value - The final mask is recorded in the handler to override the method, which identifies its corresponding mask
            mask = mask0(clazz);
            cache.put(clazz, mask); // Cache to improve program performance
        }
        return mask;
    }
Copy the code
    // Handler specifies the actual class
    // Return value - The final mask is recorded in the handler to override the method, which identifies its corresponding mask
    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        // 0b 0000 0000 0000 0000 0000 0000 0000 0001
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            // Conditional - The current handler is a subclass of ChannelInboundHandler
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                // 0b 0000 0000 0000 0000 0000 0001 1111 1111
                mask |= MASK_ALL_INBOUND;
                // Handler class
                // Parameter two: method name
                / / parameters of the three: ChannelHandlerContext. Class
                // isSkippable - Check whether the handler has overridden the specified method - there is no @skip annotation on the overridden method! - only overridden methods make sense!
                // true - Indicates that the current handler has not overridden the specified method
                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    // 0b 0000 0000 0000 0000 0000 0001 1111 1111
                    / / &
                    // 0b 1111 1111 1111 1111 1111 1111 1111 1101
                    // -> 0000 0000 0000 0000 0000 0001 1111 1101
                    mask &= ~MASK_CHANNEL_REGISTERED; // Remove the identity mask from the mask
                }
                if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_UNREGISTERED;
                }
                if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_ACTIVE;
                }
                if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_INACTIVE;
                }
                if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                    mask &= ~MASK_CHANNEL_READ;
                }
                if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_READ_COMPLETE;
                }
                if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
                }
                if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) { mask &= ~MASK_USER_EVENT_TRIGGERED; }}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                mask |= MASK_ALL_OUTBOUND;

                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
                if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_CONNECT;
                }
                if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DISCONNECT;
                }
                if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_CLOSE;
                }
                if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                    mask &= ~MASK_DEREGISTER;
                }
                if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                    mask &= ~MASK_READ;
                }
                if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                        Object.class, ChannelPromise.class)) {
                    mask &= ~MASK_WRITE;
                }
                if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) { mask &= ~MASK_FLUSH; }}if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) { mask &= ~MASK_EXCEPTION_CAUGHT; }}catch (Exception e) {
            // Should never reach here.
            PlatformDependent.throwException(e);
        }

        return mask; // The final mask is recorded as the method to be overridden by the handler, which identifies its corresponding mask
    }
Copy the code

As you can see, the mask is computed in the mask0() method, the key being the isSkippable() method call

    // Handler class
    // Parameter two: method name
    / / parameters of the three: ChannelHandlerContext. Class
    @SuppressWarnings("rawtypes")
    private static boolean isSkippable(
            finalClass<? > handlerType,final String methodName, finalClass<? >... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run(a) throws Exception {
                Method m;
                try {
                    // How to get execution parameters by reflection? - Interconnect with the ChannelInboundHandler method
                    m = handlerType.getMethod(methodName, paramTypes);
                } catch (NoSuchMethodException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                            "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                    }
                    return false;
                }
                // Find the specified method
                / / condition 2: the current handler is inherited ChannelHandlerInboundAdapter, and did not go to rewrite method corresponding method
                returnm ! =null&& m.isAnnotationPresent(Skip.class); }}); }Copy the code

The isSkippable() method matches the passed class with method names, paraTypes reflection, to determine if the handler is handling the method we are interested in, and if so, needs to be handled

Of course, the parent class of the unoverwritten method is only passed to the next CTX, while the overwritten method is obviously the implementation of the business logic we care about

How do you tell if it’s rewritten? Check if there is a corresponding annotation @Skip, as we can see in the parent class there will be an @Skip annotation

For example, ChannelInboundHandlerAdapter

    @Skip // If a subclass overrides the method, the annotation disappears
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }
Copy the code

So, we can check whether the target method is rewritten one by one through this reflection. If not, we need to modify the mask, that is, the final mask will represent the method implemented by the handler encapsulated in the current CTX

Each target method corresponds to a unique identity mask, as specified in the properties

// This is a helper class
final class ChannelHandlerMask {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelHandlerMask.class);

    // Using to mask which methods must be called for a ChannelHandler.
    static final int MASK_EXCEPTION_CAUGHT = 1;
    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_INACTIVE = 1 << 4;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
    static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
    static final int MASK_BIND = 1 << 9;
    static final int MASK_CONNECT = 1 << 10;
    static final int MASK_DISCONNECT = 1 << 11;
    static final int MASK_CLOSE = 1 << 12;
    static final int MASK_DEREGISTER = 1 << 13;
    static final int MASK_READ = 1 << 14;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;

    // Calculate the related inbound mask: 0000 0000 0001 1111 1110
    static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
    // calculate the mask associated with inbound :(including MASK_EXCEPTION_CAUGHT) 0000 0000 0001 1111 1111
    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
    // Calculate the mask associated with outbound: 1111 1110 0000 0000
    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
    // calculate the mask associated with the outbound :(including MASK_EXCEPTION_CAUGHT) 1 1111 1110 0000 0001
    private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

    private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
            new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
                @Override
                protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
                    return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32); }};Copy the code

But, now let’s look at the CTX real types, namely DefaultChannelHandlerContext

DefaultChannelHandlerContext

// Implement handler() - so CTX is fully functional!
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    // The handler is enclosed internally
    private final ChannelHandler handler;

    // Pipeline outer container, containing CTX container
    // Execute NioEventLoop for handels that take a long time to execute
    // The name of the handle in pipeline
    // Parameter 4: the processor that actually handles the business
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass()); / / as you can see, this gave AbstractChannelHandlerContext indirectly expose handler
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler(a) {
        returnhandler; }}Copy the code

As you can see, it encapsulates a handler, and now we have a pretty good idea of what CTX does

DefaultChannelPipeline

This is the pipeline we are looking at

addLast

We are curious about some of the logic added to the pipeline with the handler

    // Parameters: business-level processors - this is the most commonly used addition method
    public final ChannelPipeline addLast(ChannelHandler handler) {
        // In this case, name-null
        return addLast(null, handler);
    }

    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        // Parameter 1: group-null
        // Parameter two: name - does not explicitly specify that this will also be null
        // Parameter 3: service-layer handler
        return addLast(null, name, handler);
    }

    // parameter 1: group-null - If not specified, this will be null
    // Parameter two: name - does not explicitly specify that this will also be null
    // Parameter 3: service-layer handler
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        / / CTX bridge
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // Set the added attribute to true to indicate that the handler has been added to a pipeline
            checkMultiplicity(handler);
            / / this step becomes DefaultChannelHandlerContext ChannelInitializer encapsulation
            // Parameter 1: event executor group, usually null if not specified
            FilterName (name, handler) if name == null, specify a name for CTX
            // Generated name: handlerType + "#" + number (this number is used to prevent naming duplicates - increment)
            // Parameter three: current handler
            newCtx = newContext(group, filterName(name, handler), handler);

            // Add CTX to pipeline!
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // true - The owning channel of pipeline is not registered, and the corresponding NioEventLoop has not been bound, that is, executor = null
            // This is used to prevent null pointer exceptions
            if(! registered) {// For ChannelInitializer, the channel is not registered at this time
                newCtx.setAddPending(); // Set the handler state to ADD_PENDING, but the handlerAdd method is not called yet
                // This step encapsulates the current CTX as a task and then adds it to a global unidirectional list
                callHandlerCallbackLater(newCtx, true); / / to channel after registration, corresponding executor to perform (see invokeHandlerAddedIfNeeded ())
                return this; // so you can return it!
            }
            // At this point, the channel is already registered
            EventExecutor executor = newCtx.executor();
            if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this; }}// Call the handleAdded method of the CTX handler
        callHandlerAdded0(newCtx);
        return this;
    }
Copy the code

As you can see, newCtx is initialized in newContex(filterName generates a name for CTX, and the generated name is handlerType + “#” + number (where the number is used to prevent naming duplicating-increment)

    // Parameter 1: event executor group, usually null if not specified
    FilterName (name, handler) if name == null, specify a name for CTX
    // Generated name: handlerType + "#" + number (this number is used to prevent naming duplicates - increment)
    // Parameter three: current handler
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
Copy the code

Then add it to pipeline, corresponding to addLast0()

    // addLast adds a handler to the front of the tail
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
Copy the code

After the ChannelInitializer is added, the state is changed and the handlerAdd() method is triggered, but the channel is not registered yet. So, a globally maintained one-way list is used

Corresponding:

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // true - The owning channel of pipeline is not registered, and the corresponding NioEventLoop has not been bound, that is, executor = null
            // This is used to prevent null pointer exceptions
            if(! registered) {// For ChannelInitializer, the channel is not registered at this time
                newCtx.setAddPending(); // Set the handler state to ADD_PENDING, but the handlerAdd method is not called yet
                // This step encapsulates the current CTX as a task and then adds it to a global unidirectional list
                callHandlerCallbackLater(newCtx, true); / / to channel after registration, corresponding executor to perform (see invokeHandlerAddedIfNeeded ())
                return this; // so you can return it!
            }
Copy the code

If the channel is not registered, the state of CTX is set to ADD_PENDING, and the current CTX is wrapped as a task and added to the one-way linked list

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert! registered;This step encapsulates the ChannelHandlerContext encapsulated by ChannelInitializer - PendingHandlerAddedTask
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        // the list is queued
        if (pending == null) {
            pendingHandlerCallbackHead = task; / / as you can see, this step pendingHandlerCallbackHead stored in the layer is actually ChannelInitializer housed by the most
        } else {
            // Find the tail of the linked-list.
            while(pending.next ! =null) { pending = pending.next; } pending.next = task; }}Copy the code

The purpose is to obtain and take out the tasks in the linked list to execute the corresponding handlerAdded method one by one after the channel is registered, and modify its corresponding status to handlerAdded

Corresponding implementation method is # DefaultChanelPipeline callHandlerAddedForAllHandlers, when the channel after the registration, submit an asynchronous task, in asynchronous tasks to traverse the singly linked list

Such as:

    // Be aware that this method is executed by the EventLoop thread
    private void callHandlerAddedForAllHandlers(a) {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert! registered;// This Channel itself was registered.
            registered = true;
            // This is a one-way list of stored tasks - start the analysis on the server and get the most wrapped tasks of ChannelInitializer
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
        // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
        // the EventLoop.
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        // Iterate over the unidirectional list, execute the result
        while(task ! =null) { // The unidirectional list stores tasks to be executed. For example, if a channel is not registered, CTX needs to be encapsulated as a task and added to the global unidirectional list for exector to executetask.execute(); task = task.next; }}Copy the code

So, when CTX is encapsulated as a task and stored in the global one-way linked list, it can return in the direction, because the corresponding channel will pay attention to traversing the one-way linked list to execute the corresponding task after registration

A specific task is actually related to its corresponding instance, so let’s consider the case of pendingHandle Eradded Task

    private final class PendingHandlerAddedTask extends PendingHandlerCallback {

        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run(a) {
            callHandlerAdded0(ctx);
        }

        @Override
        void execute(a) {
            // Get the EventLoop in this step
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } atomicRemoveFromHandlerList(ctx); ctx.setRemoved(); }}}}Copy the code

Obviously, callHandlerAdded0() will be executed

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded(); / / the execution
        } catch (Throwable t) {
            boolean removed = false;
            try {
                atomicRemoveFromHandlerList(ctx);
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: "+ ctx.name(), t2); }}if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t)); }}}Copy the code

callHandlerAdded

    final void callHandlerAdded(a) throws Exception {
        // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
        // any pipeline events ctx.handler() will miss them because the state will not allow it.
        // Update the current CTX status to ADD_COMPLETE
        if (setAddComplete()) {
            // From a server startup perspective, the ChannelInitializer is obtained by handler()
            // Execute the handlerAdded method
            handler().handlerAdded(this); }}// Change the status
    final boolean setAddComplete(a) {
        for (;;) {
            int oldState = handlerState;
            if (oldState == REMOVE_COMPLETE) {
                return false;
            }
            // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
            // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
            // exposing ordering guarantees.
            if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return true; }}}Copy the code

When executor executes a task, CAS changes the status of CTX to ADD_COMPLETE, and then executes the HandlerAdded() of its enclosed handler.

So, the add logic phase of CTX is complete!

Let’s look at how events are propagated in the pipeline.

Since most of the code in pipeline is template code, fireChannelRegistered() is used as an example

fireChannelRegistered

To be clear, calling pipeline’s fire methods, such as inbound events, will start from HeadContext, which is different from calling CTX’s fireXXX. Therefore, We’ve always said that by calling the fireXXX method of CTX you can do a bit of short-circuiting to filter out the uninterested CTX

    @Override
    public ChannelHandlerContext fireChannelRegistered(a) {
        // findContextInbound(MASK_CHANNEL_REGISTERED) parameter: mask associated with ChannelRegister()
        // It is not hard to imagine that it is through this mask value to find the appropriate CTX of interest to process
        // fireContextInbound - find the CTX that implements the target method
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered(); / / first performed is actually HeadContext. InvokeChannelRegistered
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run(a) { next.invokeChannelRegistered(); }}); }}private void invokeChannelRegistered(a) {
        // Check handler status
        if (invokeHandler()) {
            try {
                // Call handle.channelregistered ()
                ((ChannelInboundHandler) handler()).channelRegistered(this); / / to perform for the first time is actually HeadContext channelRegistered
            } catch(Throwable t) { invokeExceptionCaught(t); }}else{ fireChannelRegistered(); }}Copy the code

When the handler state is valid, the handler method is executed

The state is valid to determine if the CTX is ADD_COMPLETE, which is logical because the internal methods will only be called if the CTX has been added successfully (the channel has been registered)

    private boolean invokeHandler(a) {
        // Store in local variable to reduce volatile reads.
        int handlerState = this.handlerState;
        // The handler is queued
        returnhandlerState == ADD_COMPLETE || (! ordered && handlerState == ADD_PENDING); }Copy the code

So, at this time we came to HeadContext channelRegistered ()

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            // Since firstRegistration == false, this step actually does nothing
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered(); // In the case of server startup demo, this is actually passed to LoggingHandler
        }
Copy the code

Then, come to AbstractChannelHandlerContext fireChannelRegistered

    @Override
    public ChannelHandlerContext fireChannelRegistered(a) {
        // findContextInbound(MASK_CHANNEL_REGISTERED) parameter: mask associated with ChannelRegister()
        // It is not hard to imagine that it is through this mask value to find the appropriate CTX of interest to process
        // fireContextInbound - find the CTX that implements the target method
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
    }
Copy the code

findContextInbound()

    // Parameter: the mask associated with the target method
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        // Find the appropriate CTX
        do {
            ctx = ctx.next; // Iterate over the next CTX
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); // Return true - to indicate that the current CTX is not the one we want to find and skip
        return ctx;
    }
Copy the code

By looping through the subsequent CTX in the pipe, what are the conditions?

skipContext()

    // Parameter one: current CTX
    // Parameter two: executes CTX event poller
    // Parameter three: the mask value associated with the target method
    // Parameter 4: MASK_ONLY_INBOUND
    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
        // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
        / / can be understood as a big judgment (screening effect) | | real logic CTX. ExecutionMask & Mask = = 0
        // Return true - to indicate that the current CTX is not the one we want to find and skip
        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
                // everything to preserve ordering.
                //
                // See https://github.com/netty/netty/issues/10067
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }
Copy the code

MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED Return true, false if included, and end the loop, followed by the outer method: invokeChannelRegistered

So, this method is to execute the corresponding method in CTX

Doesn’t that implement event passing?

So, we need to pay attention to one point here: we added the handler encapsulated in CTX, if the override method is not explicitly calledsuper.channelXXXOr is itctx.fireChannelXXXMethod, the event is actually cut off from the handler, that is, no further propagation

In particular, this is important for write events, which may involve data eventually being sent out to the client!

So, at this point, the interpretation of pipeline is finished!