Netty source code analysis series

  • Netty source code parsing series – server start process parsing
  • Netty source code parsing series – client connection access and read I/O parsing
  • 5 minutes to understand pipeline model -Netty source code parsing

I. Introduction to pipeline

1. What is pipeline

Pipeline, first used in THE Unix operating system, allows programs with different functions to communicate with each other, making the software more “high cohesion, low coupling”, it is a “chain model” to string different programs or components, so that they form a straight line of work.

2. Netty的ChannelPipeline

A ChannelPipeline is a two-way linked list of inbound and outbound events that process or intercept a channel. Events flow and pass through the Channel pipeline. Channelhandlers can be added or removed to handle different business logic. Generally speaking, a ChannelPipeline is an assembly line in a factory, and a ChannelHandler is a worker on the assembly line. Channel Pipelines are created automatically when they create a Channel, and each Channel has its own Channel pipeline.

3. Process Netty I/O events

I/O
handler


Second, the ChannelHandlerContext

1. What is ChannelHandlerContext

ChannelHandlerContext is the context that connects ChannelHandler to ChannelPipeline. Each additional handler creates an instance of ChannelHandlerContext. Manage ChannelHandler propagation in ChannelPipeline.

2. Relationship between ChannelHandlerContext and ChannelPipeline and ChannelHandler

A Channel pipeline is automatically created based on the creation of a Channel, holds the Channel, organizes all the handlers together, and acts like a factory pipeline. ChannelHandler has independent functional logic, can register to multiple Channel pipelines, is not save channel, equivalent to factory workers. ChannelHandlerContext is the context that connects ChannelHandler to ChannelPipeline, holds ChannelPipeline, Control ChannelHandler’s propagation direction in ChannelPipeline, equivalent to the leader on the pipeline.

Inbound events are propagated

1. What are Inbound events?

(1) channelRegistered
channel
EventLoop
pipeline.fireChannelRegistered()



(2) channelUnregistered
channel
EventLoop
pipeline.fireChannelUnregistered();
(3) channelActive
pipeline.fireChannelActive();



(4) channelInactive
pipeline.fireChannelInactive();
(5) channelRead
channel
pipeline.fireChannelRead();



(6) channelReadComplete
channel
pipeline.fireChannelReadComplete();



(7) channelWritabilityChanged
Channel
OOM
pipeline.fireChannelWritabilityChanged();



(8) userEventTriggered
ctx.fireUserEventTriggered(evt);



(9) exceptionCaught
Inbound
I/O



instructions
Inbound
I/O

2. Add read events

As we know from the previous Netty source code – server startup process and Netty source code – client connection access and read I/O process, when a new connection is registered, we perform the registration process. After successful registration, we call channelRegistered. We start with this method

   public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
          initChannel((C) ctx.channel());
          ctx.pipeline().remove(this);
          ctx.fireChannelRegistered();
}
Copy the code

InitChannel is the parameter childHandler configured at service startup that overrides the parent class method

private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("initChannel"); ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0)); ch.pipeline().addLast(new IOHandler()); }}Copy the code

Let’s recall, where is Pipeline created

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}
Copy the code

A pipeline is automatically created when a channel is created

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
Copy the code

Here we’re going to create two default handlers, an InboundHandler –> TailContext and an OutboundHandler –> HeadContext and look at the addLast method

@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
Copy the code

Create a handler name here, using the handler class name plus “#0”.

@Override public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... Handlers) {...for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, generateName(h), h);
    }
    return this;
}
Copy the code
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);
        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }
    return this;
}
Copy the code

Because the pipeline is thread safe, through the lock to ensure the safety of concurrent access to the name of the handler repeatability check, will be packed handler into DefaultChannelHandlerContext, then added to the pipeline

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}
Copy the code

Here three (1) to repetitive DefaultChannelHandlerContext calibration, if DefaultChannelHandlerContext is not Shared among multiple pipeline can be in, And has been added to the pipeline, Add IdleStateHandler HeadContext –> IOChannelInitialize –> TailContext

After adding IdleStateHandler, HeadContext –> IOChannelInitialize –> IdleStateHandler –> TailContext

(3) the handler name and DefaultChannelHandlerContext (4) establish a mapping relationship between the callback handler to add finish listen for an event last delete IOChannelInitialize



HeadContext –> IdleStateHandler –> IOHandler –> TailContext

3. Pipeline.firechannelread () event parsing

Here we choose a more typical read event parsing, the other event flow is basically similar

Private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {...if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); }... }Copy the code

When the boss thread listens for a read event, the **unsafe.read()** method is called

@Override
public final void read() {... pipeline.fireChannelRead(byteBuf); ... }Copy the code

The inbound event starts at head and ends at tail

@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
Copy the code
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() { next.invokeChannelRead(msg); }}); }return this;
}
Copy the code

Find the next Inbound event in the pipeline

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while(! ctx.inbound);return ctx;
}
Copy the code

HeadContext
Inbound
IdleStateHandler

private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}
Copy the code

Mark this channel read event as true and pass it to the next handler

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg);  System.out.println(msg.toString()); }Copy the code

IOHandler overrides the channelRead() method and calls the parent channelRead method

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}
Copy the code

Proceed to call the next handler on the event chain

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); }}Copy the code

Here the Read method of TailContext is called, releasing the MSG cache summary: Inbound events are propagated from the HeadContext node up to the end of the TailContext node

4. Propagate Outbound events

1. What are the Outbound events?

(1) bind



(2) close



(3) connect



(4) disconnect



(5) deregister
disconnect
channel
EventLoop



(6) read
OP_READ



(7) write



(8) flush

2. Parse the write event

	ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
	ctx.channel().write(resp);
Copy the code

In our project, we call write directly, not directly into a channel, but into a buffer, and flush into a channel, or write andFlush directly. Here we choose a typical write event to resolve the Outbound flow, and the other event flows are similar

@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}
Copy the code

Call the write method directly through the context-bound channel, calling the handler on the event chain corresponding to the channel

@Override
public ChannelFuture write(Object msg) {
    return tail.write(msg);
}
Copy the code

Write events are called from tail to head, as opposed to read events

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
Copy the code
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
	...
	 write(msg, false, promise); . }Copy the code
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if(flush) { next.invokeFlush(); }... }... }Copy the code

After multiple hops, get the handler from the last Ounbound event chain

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while(! ctx.outbound);return ctx;
}
Copy the code

IdleStateHandler
Inbound
Outbound



handler

HeadContext

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}
Copy the code
@Override public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; . outboundBuffer.addMessage(msg, size, promise); . }Copy the code

From here, we can see that the data is finally thrown into the buffer. From the Netty pipeline model, we have resolved the transmission of inbound events and outbound events, which can be summarized as follows:

Please “like” if you think it’s helpful.