Netty source code analysis series
- Netty source code parsing series – server start process parsing
- Netty source code parsing series – client connection access and read I/O parsing
- 5 minutes to understand pipeline model -Netty source code parsing
I. Introduction to pipeline
1. What is pipeline
Pipeline, first used in THE Unix operating system, allows programs with different functions to communicate with each other, making the software more “high cohesion, low coupling”, it is a “chain model” to string different programs or components, so that they form a straight line of work.
2. Netty的ChannelPipeline
A ChannelPipeline is a two-way linked list of inbound and outbound events that process or intercept a channel. Events flow and pass through the Channel pipeline. Channelhandlers can be added or removed to handle different business logic. Generally speaking, a ChannelPipeline is an assembly line in a factory, and a ChannelHandler is a worker on the assembly line. Channel Pipelines are created automatically when they create a Channel, and each Channel has its own Channel pipeline.
3. Process Netty I/O events
I/O
handler
Second, the ChannelHandlerContext
1. What is ChannelHandlerContext
ChannelHandlerContext is the context that connects ChannelHandler to ChannelPipeline. Each additional handler creates an instance of ChannelHandlerContext. Manage ChannelHandler propagation in ChannelPipeline.
2. Relationship between ChannelHandlerContext and ChannelPipeline and ChannelHandler
A Channel pipeline is automatically created based on the creation of a Channel, holds the Channel, organizes all the handlers together, and acts like a factory pipeline. ChannelHandler has independent functional logic, can register to multiple Channel pipelines, is not save channel, equivalent to factory workers. ChannelHandlerContext is the context that connects ChannelHandler to ChannelPipeline, holds ChannelPipeline, Control ChannelHandler’s propagation direction in ChannelPipeline, equivalent to the leader on the pipeline.
Inbound events are propagated
1. What are Inbound events?
(1) channelRegistered
channel
EventLoop
pipeline.fireChannelRegistered()
(2) channelUnregistered
channel
EventLoop
pipeline.fireChannelUnregistered();
(3) channelActive
pipeline.fireChannelActive();
(4) channelInactive
pipeline.fireChannelInactive();
(5) channelRead
channel
pipeline.fireChannelRead();
(6) channelReadComplete
channel
pipeline.fireChannelReadComplete();
(7) channelWritabilityChanged
Channel
OOM
pipeline.fireChannelWritabilityChanged();
(8) userEventTriggered
ctx.fireUserEventTriggered(evt);
(9) exceptionCaught
Inbound
I/O
instructions
Inbound
I/O
2. Add read events
As we know from the previous Netty source code – server startup process and Netty source code – client connection access and read I/O process, when a new connection is registered, we perform the registration process. After successful registration, we call channelRegistered. We start with this method
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
Copy the code
InitChannel is the parameter childHandler configured at service startup that overrides the parent class method
private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel"); ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0)); ch.pipeline().addLast(new IOHandler()); }}Copy the code
Let’s recall, where is Pipeline created
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
Copy the code
A pipeline is automatically created when a channel is created
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Copy the code
Here we’re going to create two default handlers, an InboundHandler –> TailContext and an OutboundHandler –> HeadContext and look at the addLast method
@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
Copy the code
Create a handler name here, using the handler class name plus “#0”.
@Override public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... Handlers) {...for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, generateName(h), h);
}
return this;
}
Copy the code
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
Copy the code
Because the pipeline is thread safe, through the lock to ensure the safety of concurrent access to the name of the handler repeatability check, will be packed handler into DefaultChannelHandlerContext, then added to the pipeline
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
Copy the code
Here three (1) to repetitive DefaultChannelHandlerContext calibration, if DefaultChannelHandlerContext is not Shared among multiple pipeline can be in, And has been added to the pipeline, Add IdleStateHandler HeadContext –> IOChannelInitialize –> TailContext
After adding IdleStateHandler, HeadContext –> IOChannelInitialize –> IdleStateHandler –> TailContext
(3) the handler name and DefaultChannelHandlerContext (4) establish a mapping relationship between the callback handler to add finish listen for an event last delete IOChannelInitialize
HeadContext –> IdleStateHandler –> IOHandler –> TailContext
3. Pipeline.firechannelread () event parsing
Here we choose a more typical read event parsing, the other event flow is basically similar
Private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {...if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); }... }Copy the code
When the boss thread listens for a read event, the **unsafe.read()** method is called
@Override
public final void read() {... pipeline.fireChannelRead(byteBuf); ... }Copy the code
The inbound event starts at head and ends at tail
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
Copy the code
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() { next.invokeChannelRead(msg); }}); }return this;
}
Copy the code
Find the next Inbound event in the pipeline
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while(! ctx.inbound);return ctx;
}
Copy the code
HeadContext
Inbound
IdleStateHandler
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
Copy the code
Mark this channel read event as true and pass it to the next handler
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); System.out.println(msg.toString()); }Copy the code
IOHandler overrides the channelRead() method and calls the parent channelRead method
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
Copy the code
Proceed to call the next handler on the event chain
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); }}Copy the code
Here the Read method of TailContext is called, releasing the MSG cache summary: Inbound events are propagated from the HeadContext node up to the end of the TailContext node
4. Propagate Outbound events
1. What are the Outbound events?
(1) bind
(2) close
(3) connect
(4) disconnect
(5) deregister
disconnect
channel
EventLoop
(6) read
OP_READ
(7) write
(8) flush
2. Parse the write event
ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
ctx.channel().write(resp);
Copy the code
In our project, we call write directly, not directly into a channel, but into a buffer, and flush into a channel, or write andFlush directly. Here we choose a typical write event to resolve the Outbound flow, and the other event flows are similar
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
Copy the code
Call the write method directly through the context-bound channel, calling the handler on the event chain corresponding to the channel
@Override
public ChannelFuture write(Object msg) {
return tail.write(msg);
}
Copy the code
Write events are called from tail to head, as opposed to read events
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
Copy the code
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
...
write(msg, false, promise); . }Copy the code
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if(flush) { next.invokeFlush(); }... }... }Copy the code
After multiple hops, get the handler from the last Ounbound event chain
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while(! ctx.outbound);return ctx;
}
Copy the code
IdleStateHandler
Inbound
Outbound
handler
HeadContext
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
Copy the code
@Override public final void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; . outboundBuffer.addMessage(msg, size, promise); . }Copy the code
From here, we can see that the data is finally thrown into the buffer. From the Netty pipeline model, we have resolved the transmission of inbound events and outbound events, which can be summarized as follows:
Please “like” if you think it’s helpful.