The Netty Reactor thread is the engine that drives the entire Netty framework, and the server binding and the establishment of new connections is the fuse that igniting the engine

Netty will establish corresponding channels in the process of server port binding and new connection establishment. The action of channel is closely related to the concept of pipeline. Pipeline can be regarded as an assembly line, in which the raw material (byte stream) comes in, is processed, and finally output

In this article, I will take the establishment of a new connection as an example divided into the following parts to introduce you to netty pipeline is how to play up

  • Pipeline initialization
  • Pipeline add node
  • Pipeline delete node

Pipeline initialization

In this new connection setup article, we learned that the core components of netty are created when a NioSocketChannel is created

Pipeline is one of them and is created in the following code

AbstractChannel

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
Copy the code

AbstractChannel

protected DefaultChannelPipeline newChannelPipeline(a) {
    return new DefaultChannelPipeline(this);
}
Copy the code

DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
Copy the code

A reference to a channel is stored in the pipeline. After the pipeline is created, the entire pipeline looks like this

Each node in a pipeline is a ChannelHandlerContext object. Each context node holds the context that the executor ChannelHandler wrapped around it needs to perform operations. Because a pipeline contains a reference to a channel, all context information can be retrieved

By default, a pipeline has two nodes, head and tail. In the next article, we will analyze these two special nodes. Today, we will focus on pipeline

Pipeline add node

Here is a very common piece of client code

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(newEncoder()); }});Copy the code

First, use a spliter to unpack the source TCP packet, and then conduct decoder on the disassembled packet, pass it into business processor BusinessHandler, business process encoder, output

The entire pipeline structure is as follows

I used two colors to distinguish two different types of nodes in pipeline. One is ChannelInboundHandler, which processes inBound events. The most typical is reading data stream and processing. Another type of Handler is ChannelOutboundHandler, which handles outBound events, such as when the writeAndFlush() class method is called

Regardless of the type of handler, the outer ChannelHandlerContext object is connected to each other through a two-way list, and the difference between a ChannelHandlerContext is in or out, When adding nodes we can see how Netty handles this

DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
Copy the code
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
Copy the code
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1. Check whether duplicate handlers exist
        checkMultiplicity(handler);
        // 2. Create a node
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3. Add nodes
        addLast0(newCtx);
    }
   
    // 4. Call back user methods
    callHandlerAdded0(handler);
    
    return this;
}
Copy the code

The simple synchronized method is used here to prevent multiple threads from concurrently operating on the bi-directional linked list underlying pipeline

Let’s walk through the above code step by step

1. Check whether duplicate handlers exist

When user code adds a handler, it first checks to see if the handler has been added

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if(! h.isSharable() && h.added) {throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true; }}Copy the code

Netty uses a member variable added to indicate whether a channel has been added. This code is simple: if the current Handler to be added is non-shared and has already been added, throw an exception; otherwise, the Handler has been added

A sharable Handler can be added to the pipeline an infinite number of times. If a Handler is shared with the client code, we simply need to add a @sharable tag, as shown below

@Sharable
public class BusinessHandler {}Copy the code

If the Handler is sharable, it is usually used via Spring injection rather than new each time

The isSharable() method is implemented by whether the corresponding class of this Handler is tagged @sharable

ChannelHandlerAdapter

public boolean isSharable(a) { Class<? > clazz = getClass(); Map<Class<? >, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz);if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}
Copy the code

Netty also uses ThreadLocal to cache Handler state for maximum performance. This method is created and called every time a Handler is added to a high number of concurrent connections

2. Create a node

Back to the main flow, look at the code that creates the context

newCtx = newContext(group, filterName(name, handler), handler);
Copy the code

FilterName (name, handler), which creates a unique name for the handler

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}
Copy the code

Obviously, if we pass in a null name, Netty will generate a default name for us. Otherwise, netty will check if there is a duplicate name and return if it passes

Netty creates a default name for class #0

private static finalFastThreadLocal<Map<Class<? >, String>> nameCaches =newFastThreadLocal<Map<Class<? >, String>>() {@Override
    protectedMap<Class<? >, String> initialValue()throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private String generateName(ChannelHandler handler) {
    // See if the default name is generated in the cacheMap<Class<? >, String> cache = nameCaches.get(); Class<? > handlerType = handler.getClass(); String name = cache.get(handlerType);// Create a default name and add it to the cache
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }

    // The default name does not conflict
    if(context0(name) ! =null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break; }}}return name;
}
Copy the code

Netty uses a FastThreadLocal variable to cache the mapping between the Handler class name and the default name. When generating name, first check to see if the default name(simple class name #0) has been generated in the cache. GenerateName0 () is called to generate the default name and then added to the cache

Context0 (); context0(); context0()

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while(context ! = tail) {if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}
Copy the code

Context0 () iterates through each ChannelHandlerContext, returns the contextContext with the same name as the one to be added, and throws an exception

If context0 (name)! = null if the context already has a default name, use the simple class name #1 to find a unique name, such as simple class name #3

If user code specifies a name when adding a Handler, the only thing to do is to check for duplication

private void checkDuplicateName(String name) {
    if(context0(name) ! =null) {
        throw new IllegalArgumentException("Duplicate handler name: "+ name); }}Copy the code

After processing the name, we enter the process of creating the context. As we know from the previous call chain, the group is null, so childExecutor(group) also returns NULL

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    / /..
}

Copy the code

DefaultChannelHandlerContext

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}
Copy the code

Constructor, DefaultChannelHandlerContext parameters returned to the parent class, save Handler reference, into its parent class

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}
Copy the code

Netty uses two fields to indicate whether the channelHandlerContext is inBound, outBound, or both. The two Booleases are determined by the following two small functions:

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
Copy the code

The instanceof keyword is used to determine the interface type. Therefore, if a Handler implements both types of interfaces, it is both an inBound and an outBound Handler, such as the following class

A commonly used codec, which combines decode and encode operations together, inherits ChannelDuplexHandler from MessageToMessageCodec

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN.OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;
 }

Copy the code

After the context is created, it is finally time to add the created context to the pipeline

3. Add a node

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; / / 1
    newCtx.next = tail; / / 2
    prev.next = newCtx; / / 3
    tail.prev = newCtx; / / 4
}
Copy the code

The following diagram shows a simple representation of this process, which is essentially a two-way list insertion operation

After operation, the context is added to the pipeline

At this point, the pipeline adds nodes and you can master all the addxxx() methods in this way

4. Call back user methods

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
    ctx.setAddComplete();
}

Copy the code

At step 4, the new node is added to the pipeline, and the user code ctx.handler().handlerAdded(CTX) is called back; Common user codes are as follows

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<... >{
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // Call back to this node after it has been added
        // do something}}Copy the code

Next, set the state of the node

AbstractChannelHandlerContext

final void setAddComplete(a) {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return; }}}Copy the code

Use CAS to change the status of the node to REMOVE_COMPLETE (indicating that the node has been removed) or ADD_COMPLETE

Pipeline delete node

One of the biggest features of Netty is that the Handler can be pluggable and used to dynamically weave pipelines. For example, when a connection is first established, the context needs to be authenticated. After authentication, the context can be removed. The next time pipeline propagates an event, it will not invoke the permission authentication handler

The following is the simplest implementation of the permission authentication Handler. The first packet sends the authentication message. If the verification succeeds, the Handler is deleted

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else{ ctx.close(); }}private boolean verify(ByteBuf byteBuf) {
        / /...}}Copy the code

Ctx.pipeline ().remove(this

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}

Copy the code

The remove operation is much simpler than the add operation, which is divided into three steps:

1. Find the node to be deleted. 2

1. Locate the node to be deleted

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        returnctx; }}@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            returnctx; } ctx = ctx.next; }}Copy the code

In order to find the corresponding context of this Handler, we iterate through the bidirectional list until the Handler of a context is the same as the current Handler, and then we find the node

2. Adjust the bidirectional linked list pointer deletion

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assertctx ! = head && ctx ! = tail;synchronized (this) {
        // 2. Adjust bidirectional linked list pointer deletion
        remove0(ctx);
    }
    // 3. Callback user function
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; / / 1
    next.prev = prev; / / 2
}

Copy the code

The process is simpler than adding nodes and can be illustrated in the following figure

The final result is zero

Combining these two images gives you a good idea of how permission validation handlers work and how deleted nodes are automatically reclaimed by gc after some time because there are no object references

3. Call back the user function

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally{ ctx.setRemoved(); }}Copy the code

At step 3, the nodes in the pipeline are removed, and the callback to the user code ctx.handler().Handlerremoved (CTX) begins; , common code is as follows

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<... >{
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // After the node is deleted, you can call back to this node to do some resource cleaning
        // do something}}Copy the code

Finally, set the node’s state to removed

final void setRemoved(a) {
    handlerState = REMOVE_COMPLETE;
}
Copy the code

The other methods in the RemoVEXXX series are similar. You can expand the other methods in the remoVEXXX series based on the above ideas

conclusion

1. Take the creation of a new connection as an example. A channel is created during the creation of a new connection, and a pipeline corresponding to the channel is created during the creation of a channel. The ChannelHandlerContext contains all the context information for pipeline and channel.

2. Pipeline is a two-way linked list structure, adding and deleting nodes only need to adjust the linked list structure

3. Each node in pipeline is wrapped with the specific processor ChannelHandler, The node determines whether the node is in, out, or both based on whether ChannelHandler is ChannelInboundHandler or ChannelOutboundHandler

The next article will continue the analysis of pipeline, stay tuned!

If you want to systematically learn Netty, my little book “Netty introduction and Actual Practice: Imitate writing wechat IM instant messaging system” can help you, if you want to systematically learn Netty principle, then you must not miss my Netty source code analysis series video: Coding.imooc.com/class/230.h…