Netty responsibility chain design pattern
Design patterns
The chain of responsibility pattern creates a chain of processing objects for the request
The process of initiating the request is decoupled from the process of processing the request: the handler on the responsibility chain is responsible for processing the request, and the customer only needs to send the request to the responsibility chain without caring about the processing details and delivery of the request.
Implement the chain of responsibility model
There are four elements to realize the chain of responsibility mode:
- Processor abstract class
- A concrete processor implementation class
- Save processor information
- Processing performed
Collection storage – examples of pseudocode
// Create a processor abstract class
class AbstractHandler{
void doHandler(Object args);
}
// Processor-specific implementation class
class Handler1 extends AbstractHandler {
void doHandler(Object args){
//handler}}class Handler2 extends AbstractHandler {
void doHandler(Object args){
//handler}}class Handler3 extends AbstractHandler {
void doHandler(Object args){
//handler}}// Create a collection and store all processor instance information
List handlers = new ArrayList<AbstractHandler>()
handlers.add(new Handler1())
handlers.add(new Handler2())
handlers.add(new Handler3())
// Call the handler to process the request
void process(request) {
for(AbstractHandler handler:handlers) { handler.doHandler(request); }}// Initiate a request and process the request through the chain of responsibility
call.process(request)
Copy the code
Tomcat uses this chain of responsibility model
Linked list calls – pseudocode examples
// The processor abstract class
class AbstractHandler {
AbstractHandler next;
void doHandler(Object args);
}
// Processor-specific implementation class
class Handler1 extends AbstractHandler {
void doHandler(Object args){
//handler}}class Handler2 extends AbstractHandler {
void doHandler(Object args){
//handler}}class Handler3 extends AbstractHandler {
void doHandler(Object args){
//handler}}// Store the processor in a linked list
pipeline = start [new Handler1() -> new Handler2() -> new Handler3()] end
// Process the request, calling the handler from scratch
void process(request) {
handler = pipeline.findOne;
while(handler ! =null){ handler.doHandler(request); handler = handler.next(); }}Copy the code
Netty is the use of this chain of responsibility pattern, so according to the structure of the analysis of Netty chain call, and the last one except for the processor save form is not different.
Custom responsibility chain mode
Now that we have written the chain store pseudo-code and want to use it to clarify the netty call rules, we can copy this method in the simplest code for subsequent understanding, as follows:
/ * * *@author daniel
* @version 1.0.0
* @date2021/12/12 * /
public class PipelineDemo {
public static void main(String[] args) {
AbstractHandler handler1 = new Handler1();
AbstractHandler handler2 = new Handler2();
AbstractHandler handler3 = new Handler3();
handler1.setNextHandler(handler2);
handler2.setNextHandler(handler3);
HandlerChainContext handlerChainContext = new HandlerChainContext(handler1);
handlerChainContext.startRun("Ding Dai Guang"); }}/** * processor context, which is responsible for maintaining the list and the execution of the processors in the list */
class HandlerChainContext {
AbstractHandler currentHandler;
/** * Initializes the responsibility header *@paramCurrentHandler Responsibility header */
public HandlerChainContext(AbstractHandler currentHandler) {
this.currentHandler = currentHandler;
}
/** * responsibility chain call entry *@paramArgs requests information */
public void startRun(Object args) {
currentHandler.doHandler(this, args);
}
/** * executes the next processor *@paramThe args parameter * /
public void runNext(Object args) {
AbstractHandler nextHandler = currentHandler.getNextHandler();
currentHandler = nextHandler;
if (nextHandler == null) {
System.out.println("The end");
return;
}
nextHandler.doHandler(this, args); }}/** * defines the processor abstract class */
abstract class AbstractHandler {
AbstractHandler nextHandler;
public AbstractHandler getNextHandler(a) {
return nextHandler;
}
public void setNextHandler(AbstractHandler nextHandler) {
this.nextHandler = nextHandler;
}
abstract void doHandler(HandlerChainContext handlerChainContext, Object args);
}
/** * Processor implementation class 1 */
class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object args) {
args = args.toString() + " --- handler1";
System.out.println("It has been processed by the Handler1 handler and the result is :" + args);
// Execute the next processorhandlerChainContext.runNext(args); }}class Handler2 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object args) {
args = args.toString() + " --- handler2";
System.out.println("It has been processed by the Handler2 processor and the result is :" + args);
// Execute the next processorhandlerChainContext.runNext(args); }}class Handler3 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object args) {
args = args.toString() + " --- handler3";
System.out.println("It has been processed by the Handler3 processor, and the result is :" + args);
// Execute the next processorhandlerChainContext.runNext(args); }}Copy the code
The result is as follows:
Handler1 has been processed by the Handler1 processor. Result: Dimlight -- Handler1 has been processed by the Handler2 processor. Result: Dimlight -- Handler2 Has been processed by the Handler3 processor, processing result: DDL -- Handler1 -- Handler2 -- Handler3 endCopy the code
This is strictly in accordance with the above pseudo-code implementation of a simple list of storage responsibility chain model, of course, NetTY Chinese medicine is more complex than this is more secure, although simple but the understanding of Netty responsibility chain model must be of great benefit.
ChannelPipeline responsibility chain in Netty
-
In Netty, the Pipeline pipe holds all processor information for the channel.
-
This pipeline is a proprietary pipeline that is automatically created when a Channel is created using the initAndRegister method on the bind port as follows:
channel = channelFactory.newChannel(); Copy the code
++ Through debug and the above can be clearly known to create NioServerSocketChanel, so you can directly check its constructor ++, as follows:
/** * Create a new instance */ public NioServerSocketChannel(a) { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } Copy the code
/** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } Copy the code
< AbstractNioChannel > < AbstractNioChannel >
/** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); // Set to non-blocking mode } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code
This method sets niO to non-blocking mode and is already on track to call the NIO framework provided by the JDK, continuing with the constructor ++ of its parent class AbstractChannel, as follows:
/** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); // create Pipeline } Copy the code
The newChannelPipeline method ++ is the newChannelPipeline method:
/** * Returns a new {@link DefaultChannelPipeline} instance. */ protected DefaultChannelPipeline newChannelPipeline(a) { return new DefaultChannelPipeline(this); // Create a DefaultChannelPipeline object, which is the pipeline object, responsible for link management } Copy the code
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); // Create the end of the list head = new HeadContext(this); // Create the linked list header // There is no processor in the middle, so far only end to end head.next = tail; tail.prev = head; } Copy the code
Now it looks a lot like a simple custom demo, except DefaultChannelPipeline doesn’t have a single handler in it, and of course not,HeadContext and TailContext are both processors, just a special handler
What other processors can be on this link let’s take a look at……
-
Both inbound events (write) and outbound operations (read) invoke handlers on the pipeline.
The next thing we need to do is:
- What do inbound events and outbound operations mean?
- What does the Handler do?
- How does Pipeline maintain handlers?
- What is the execution flow of handler?
Inbound events and outbound events
An inbound event is typically an I/O thread that generates inbound data, such as EventLoop receiving selector OP_READ, and the inbound handler calling socketchannel.read (ByteBuffer) to receive the data, which results in the channel’s ChannelPipeline The channelRead method in the next contained is called. Basically, the socket has data coming in from the lower level and automatically ADAPTS the corresponding inbound processor to handle it.
NioData -> Netty YinHandler: The Netty layer processes data
An outbound event is usually when the I/O thread performs an actual output operation, such as the bind method, which is meant to request the server The socket is bound to the given SocketAddress, which causes the bind method in the next outbound handler contained in the ChannelPipeline to be called, basically manually calling the appropriate outbound handler to handle a data sent to the underlying socket logic.
NettyOutHandler -> NIO: The Netty layer sends data to the lower layer for processing
For these two types of events, Netty provides more specific events as follows:
- Inbound events (inbound)
- FireChannelRegistered: channel registration event
- FireChannelUnregistered: channel in addition to register event
- FireChannelActive: channel active events
- FireChannelInactive: the channel is not active
- FireExceptionCaught: indicates an abnormal event
- FireUserEventTriggered: User-defined event
- FireChannelRead: read event channel
- Complete event fireChannelReadComplete: channel to read
- FireChannelWritabilityChanged: channel state change events
- Outbound events
- Bind: indicates the port binding event
- Connect: indicates the connection event
- Disconnect: Indicates the disconnection event
- Close: the event is closed
- Derigister: Contact registration events
- Flush: Flushes data to network events
- Read: Read event, used to register OP_READ to selector
- Write: write events
- WriteAndFlush: Write data events
Event description so far, after viewing the source code to understand the event mechanism in detail, from the beginning to the end in addition to the event in this article is the most common is the processor, also know how to store the Netty processor (pipeline), so in addition to the head of the HeadContext and TailContext processor also What other processors are there?
What is a handler in a Pipeline?
All handlers are derived from a processor-level ChannelHandler interface that handles IO events and intercepts IO events and forwards them to the next handler in the ChannelPipeline, as follows:
package io.netty.channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value}}Copy the code
As you can see, the top-level interface definition is very weak. In practice, it implements two major subinterfaces:
- ChannelInboundHandler: Processes inbound I/O events
- ChannelOutboundHandler: Processes outbound I/O events
This corresponds to the event above, meaning that the event is used in the handler.
Netty provides some simple classes for implementing the inbound and outbound interfaces:
- ChannelInboundHandlerAdapter: handle the inbound IO events
- ChannelOutboundHandlerAdapter: handle the inbound IO events
- ChannelDuplexHandler: Supports simultaneous processing of inbound and outbound events
== As with the custom chain of responsibility pattern, the actual object stored in the Pipeline in Netty is not ChannelHandler, but the context object ChannelHandlerContext, which wraps the handler in the context object and passes it through the Chann to which it belongs ElPipeline interactions, passing events up or down or modifying pipelines, are done through this context object.==
Now that you know what the handler is and what inbound and outbound events are executed in the handler, how does Netty maintain a whole ChannelHandler link?
How to maintain handlers in Pipeline
One premise is that a ChannelPipeline is thread-safe, which means that a ChannelHandler can be added, removed, or replaced at any time. The usual operation is to add a ChannelHandler to the pipeline during initialization Rich API for Pipeline management handlers (looking ahead in Defau… Pipeline), as follows:
- AddFirst: Inserts first
- AddLast: Insert at the end
- AddBefore: Inserts before the specified handler
- AddAfter: Inserts after the specified handler
- Remove: Removes the specified processor
- RemoveFirst: Removes the first processor
- RemoveLast: Removes the last processor
- Replace: replaces the specified processor
Said so much from the theory has solved three problems, first look at the source… Otherwise it’s all theoretical… Of course, read the source code also did not egg with so many API directly used, just to fool XX interviewer…
Processor + event source
The signature has already looked at the code of the top-level interface. There are very few codes and very few methods.
ChannelInboundHandler
* /package io.netty.channel;
/ * * * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
Copy the code
ChannelOutboundHandler
package io.netty.channel;
import java.net.SocketAddress;
/ * * * {@link ChannelHandler} which will get notified for IO-outbound-operations.
*/
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
Copy the code
The two major subinterfaces that extend from the top-level interface ChannelHandler define methods for inbound and outbound events, respectively. There is no special requirement that these events cover all the events we use in our development.
None of these interfaces will do us much good unless we have to customize the processor, but we need to know what they can do. In most cases, we should use Netty’s easy-to-develop adapters, as follows:
ChannelInboundHandlerAdapter
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception { ctx.fireExceptionCaught(cause); }}Copy the code
ChannelOutboundHandlerAdapter
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}
@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}
@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }}Copy the code
ChannelDuplexHandler
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
Copy the code
As you can see, these three adapters have already helped us implement the interface, so we just need to use it. Even if we need to enhance events, we only need to inherit from these three adapters to enhance a single method, and there is no need to implement the top-level interface to implement all methods manually.
From the three adapters, you can see the ChannelHandlerContext class throughout this article, which acts as a context and maintains the entire processor link, which is simply an interface. We can analyze its a default implementation class DefaultChannelHandlerContext, as follows:
DefaultChannelHandlerContext
package io.netty.channel;
import io.netty.util.concurrent.EventExecutor;
/** * The default context implementation class */
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler; // The handler that needs to be executed is stored in the context
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler; // The handler passed in when the context object is created
}
@Override
public ChannelHandler handler(a) {
returnhandler; }}Copy the code
As with custom responsibility chains, store a handler in context, execute the handler in context, and suddenly see something wrong… Isn’t it an implementation class? Interface? What about the hierarchy of processor links? Don’t try so hard to see his father class AbstractChannelHandlerContext
AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext.ResourceLeakHint {
volatile AbstractChannelHandlerContext next; // Next processor
volatile AbstractChannelHandlerContext prev; // The previous processor
}
Copy the code
It comes, you can see AbstractChannelHandlerContext ChannelHandlerContext interface is achieved, and there are two variables next and prev used to maintain link parent-child relationships
== raises a question, shouldn’t a single context object be able to maintain an entire link? Why is the context object store used here? = =
+ + behind a guess wrong words again to correct, we are in the custom in processor storage under the summary using the next one processor, and the processor in the netty, each processor is not any other processor design, so you need to use multiple context object to deal with the relationship between before and after a link, this is a guess, not necessarily for… ++
The default Pipeline implementation class DefaultChannelPipeline implements the ChannelPipeline interface, so there must be API implementation of the processor link operation, as follows:
/** * Default pipe implementation class */
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head; / / object
final AbstractChannelHandlerContext tail; / / object
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this); // Create the end of the list
head = new HeadContext(this); // Create the linked list header
// There is no processor in the middle, so far only end to end
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler);
}
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
newCtx = newContext(group, name, handler); // Create a new context object
addFirst0(newCtx); // Actually handle the context procedure
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if(! registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx,true);
return this;
}
EventExecutor executor = newCtx.executor();
if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
/** * The actual process of adding context to the first link *@paramNewCtx Newly created processor context object */
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next; // The current first
newCtx.prev = head; // Start with the first one
newCtx.next = nextCtx; // The last one is the current first one
head.next = newCtx; // Next to the header is the new context object
nextCtx.prev = newCtx; // The current first is preceded by the newly created context object}}Copy the code
It can be seen that the pipeline initializes the header and tail, but it is not a processor but a context object. In THE API, the processor is also encapsulated as a context object for processing, so our guess is correct, i.e A context pipeline object stores two special context objects. These two context objects can be used to string together a string of context objects (prev and next), which are actually connected to the processor. This has the advantage of any processor Object can be used anywhere, because the handler object does not contain any information designed for other processors, it is pure, and encapsulates a context object when used elsewhere, wonderful……
Put the previous three questions together and draw a picture as follows:
So the first three problems are solved both theoretically and from the source side, then analyze the handler execution process is what
Handler execution analysis
As shown in the figure above, even if there are multiple processors in the Pipeline (both inbound and outbound), the Pipeline will automatically choose not to use an outbound handler for an inbound event (125) and not to use an inbound handler for an outbound event (521) when executing a specific event.
In the context object that encapsulates the processor, methods starting with fire represent the propagation and processing of inbound events, and the remaining methods represent the propagation and processing of outbound events.
Analyze processing of registered inbound events
Registered events represent the binding of channels and selectors in NIO, and in NetTY represent the binding of channels and eventloops, such as the bind method
First of all, a Channel is a proprietary pipeline that is automatically created when a Channel is created. It is used in the initAndRegister method. After the Channel is created, the Channel is initialized using the init method in the ServerBootStrap as follows:
ServerBootstrap
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline(); // Get the pipeline object in the channel
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions = newOptionsArray(childOptions);finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = newAttributesArray(childAttrs);ChannelInitializer is a special handler that initializes the channel, executes it once, and then destroys it
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) { // When ChannelInitializer is triggered, the initChannel method is executed after receiving a successful registration event
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
Before adding the ChannelInitializer handler, == Pipeline only had two handlers in the responsibility chain,HeadContext and TailContext (HeadContext->TailContext), which was later added via p.addlast () A processor ChannelInitializer== looks like this:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {}
Copy the code
First can see ChannelInitializer is an inbound event handler, we focus on initChannel from the processor, channelRegistered and handlerAdded, as follows:
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
/ / if handlerAdded (...). The channelRegistered method is executed, and the channelRegistered method should theoretically not be called again
if (initChannel(ctx)) { // This handler is removed after the initChannel method is called
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.ctx.fireChannelRegistered(); }}/ * * * {@inheritDoc} If override this method ensure you call super! * /
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) { // Call the initialization method if the current channel is already registered
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx); // Remove the handler}}}/** * Channel initialization */
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel()); // This init method is usually the same initChannel method that was implemented when the channel was created
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...) .
// We do so to prevent multiple calls to initChannel(...) .
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this); //ChannelInitializer removes itself from the pipeline after execution to avoid repeated initialization}}return true;
}
return false;
}
Copy the code
The channelRegistered method has a comment worth noting if handlerAdd(…) Once executed, the channelRegistered method should theoretically never be called again. Why?
If you look at the handlerAdd method, you’ll see that if the channel has been initialized it removes the handler from the chain of responsibility, and the channelRegistered method above really won’t be called again.
Either channelRegistered or handlerAdd executes initChannel(ChannelHandlerContext), which calls initChannel((C) ctx.channel()); This method is the code that we override when we execute p.addlast (), and can also be found in initChannel(ChannelHandlerContext) to remove the current handler. Pipeline.remove (this)
From the analysis of ChannelInitializer, we can also realize the magic of dynamic addition and deletion of processors in the responsibility chain
The handler in the responsibility chain has changed since the p.addlast () method was executed. That is, HeadContext->ChannelInitializer ->TailContext . = =
This completes the init() method in initAndRegister() issuance
After we check the register method, we already know the register method in front end of the actual call is AbstractChannel. Register, as follows:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//eventLoop --> selector
/ / promise - > channel
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if(! isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// Determine whether the thread calling the bound method is a selector thread, if not submitted as a thread
if (eventLoop.inEventLoop()) {
// The actual binding process
register0(promise);
} else {
try {
// Once a task submission is triggered,eventLoop will start polling
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}}Copy the code
The AbstractChannel. This. EventLoop = eventLoop; Eventloop.ineventloop () = EventLoop(); EventLoop() = EventLoop();
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }});Copy the code
To be clear, EventLoop does not have threads. Threads are created through the thread creator only when the task is executed. We won’t go into details, but we’ll focus on register0(Promise) as follows:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // In NioChannel, bind a Channel to a NioEventLoop -- essentially a code call like NIO
// Registration is complete
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded(); // Trigger the HandlerAdd method -- here he comes
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); // Propagate channel registration completion event
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
// Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code
Registration completed before code don’t need to elaborate, now we study pipeline. InvokeHandlerAddedIfNeeded (); , start from here channel began to formal execution in a processor events, this is a HandlerAdd events, currently we have 3 processors, only the second processor have corresponding execution method, so it must be in the execution of the second processor method, it’s a way to go back, the bottom layer of the code we don’t need to care about, the debug in dot points You can find it.
We’ll focus on the execution of the second handler, initChannel(CTX) with the handlerAdded method, and removeState(CTX) after execution. , so the execution of the pipeline. InvokeHandlerAddedIfNeeded (); Then there are only two processors left in the pipeline, HeadContext and TailContext, but is this really the case?
Executing initChannel(CTX) in the handlerAdded method executes the abstract method we implemented when we added the handler. We seem to have been forgetting what the abstract method of the handler we added is. Take a closer look:
@Override
public void initChannel(final Channel ch) { // When ChannelInitializer is triggered, the initChannel method is executed after receiving a successful registration event
final ChannelPipeline pipeline = ch.pipeline(); / / object of pipeline
ChannelHandler handler = config.handler(); // Read the processor from the configuration
if(handler ! =null) {
pipeline.addLast(handler); // The processor in the configuration is not empty
}
ch.eventLoop().execute(new Runnable() { // Finally add a task to the ServerBootstrapAcceptor processor
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }Copy the code
What is config.handler()? . This is the configuration we added before executing the Bing method, as follows:
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(serverHandler); }});Copy the code
A handler LoggingHandler must be present in the responsibility chain after executing the initChannel method, and LoggingHandler inherits from the ChannelDuplexHandler, which is an aggregation handler, either for outbound events or for outbound events Inbound events execute methods in this handler, no doubt, this is a logging.
Finally, a processor, ServerBootstrapAcceptor, was added.
. = = that is performed pipeline invokeHandlerAddedIfNeeded (); After that, there are currently 4 handlers in the chain, namely HeadContext->LoggingHandler->ServerBootstrapAcceptor->TailContext, of course there should be 5 handlers before removing the original handler.==
However, the ServerBootstrapAcceptor handler is not found in the chain of responsibility when the method execution completes… This is… Embarrassed, keep looking no ==
After the execution of the method is pipeline fireChannelRegistered (); Used to propagate the channel registration completion event, this event is simple, traversing all handlers to execute the Register event until the handler does not call the next handler, which is TailContext.
The notification of the registration event is asynchronous. After the task is submitted, another thread is processing the event
At this point the Register inbound event is done, followed by the BIND outbound event.
A handler that parses bind outbound events
The code that actually performs port binding is as follows:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) { // Submit a task to NioEventLoop for port binding
// The registration is complete and successful
if (regFuture.isSuccess()) {
/ / binding
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code
This method will publish the registration task asynchronously to the EventLoop, focusing on channel.bind(…). Call, actually call abtractchannel.bind as follows:
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// Trigger a binding event in the Netty responsibility chain, which is initiated by the application layer code to the bottom layer and belongs to outBound
return pipeline.bind(localAddress, promise);
}
Copy the code
Pipeline. Bind (localAddress, promise) : bind(localAddress, promise) :
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
Copy the code
Whether you can see through tail began to call the bind of the outbound event, this is one of the characteristics of the outbound event, began to call in from the tail, if not rewrite the bind event method will call AbstractChannelHandlerContext. The bind method, as follows:
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); // Find the next outbound event handler
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise); // Event execution
} else {
safeExecute(executor, new Runnable() {
@Override
public void run(a) {
next.invokeBind(localAddress, promise);
}
}, promise, null.false);
}
return promise;
}
Copy the code
Look for the next outbound handler, actually look for the previous outbound handler, this is the other way around, and if it finds it it calls next. InvokeBind (…). Method to execute, as follows:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); // Actually call the bind method in the handler
} catch(Throwable t) { notifyOutboundHandlerException(t, promise); }}else{ bind(localAddress, promise); }}Copy the code
In the chain of responsibility above, the bind method in LoggingHandler must be called. It is up to the code in the handler to continue the search until no bind outbound event is propagated, in this case the bind method in HeadContext, as follows:
@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
Copy the code
Unsafe is an unsafe word. Its constructor is as follows:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
Copy the code
Unsafe is the unsafe object of a channel, and the abstractchannel. bind method is called as follows:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceofInetSocketAddress && ! ((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && ! PlatformDependent.isWindows() && ! PlatformDependent.maybeSuperUser()) {// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if(! wasActive && isActive()) { invokeLater(new Runnable() {
@Override
public void run(a) { pipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }Copy the code
We just need to focus on doBind(localAddress); We know that NioServerSocketChannel is the channel, so we can directly find the corresponding doBind method, as follows:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else{ javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code
At last, we called the underlying logic of JDK-NIO for port binding.
At this point, the server is ready to use.
ServerBootstrapAcceptor (accept) : accept handler (accept handler) : accept handler (accept handler) : accept handler (accept handler) : accept handler (accept handler)
Analyze the processing of the accept inbound event
By now, the server has been started, and the EventLoop has polling events, and when the EventLoop polls for an Accept event, it will start accepting the propagation of inbound events. This is a pre-condition. It should be noted that the selector is exactly the same as the NIO selector Just encapsulation.
NioEventLoop: run (); NioEventLoop: run ();
private void processSelectedKeys(a) {
if(selectedKeys ! =null) {
processSelectedKeysOptimized();
} else {
// Polling with events in the selector generated when the NioEventLoop is initialized
// But this selector will only work if it is bound to the channel (it must be bound when these events are executed)
// Reads the event from the selector bound to the channelprocessSelectedKeysPlain(selector.selectedKeys()); }}Copy the code
/** * handle events *@paramSelectedKeys event collection */
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) { // No events
return;
}
// Iterate over the query result
Iterator<SelectionKey> i = selectedKeys.iterator();
/ / polling
for (;;) {
// Encapsulated events
final SelectionKey k = i.next();
// Get the channel corresponding to the event
final Object a = k.attachment();
i.remove(); // Remove the event from the check
if (a instanceof AbstractNioChannel) { // Check whether it is a Netty channel
// Handle a single event
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if(! i.hasNext()) {break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else{ i = selectedKeys.iterator(); }}}}Copy the code
/** * handle a single event *@param k
* @param ch
*/
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//k --> encapsulated events
Ch --> get the netty channel corresponding to the event
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if(! k.isValid()) {// Check whether the event is valid (if not, enter this logic to turn off unsafe).
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop(); // Get niO event executor
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps(); // Get the event type
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if((readyOps & SelectionKey.OP_CONNECT) ! =0) { // The connect event is primarily used by the client
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if((readyOps & SelectionKey.OP_WRITE) ! =0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
// Write events are handled here
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
// Accept and read are processed here (server focus)unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
The unsafe.read() method is the most important for handling joins, since unsafe is the unsafe channel itself, so we’ll go straight to niomessage.read (). The methods are as follows:
@Override
public void read(a) {
assert eventLoop(a).inEventLoop(a);
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if(exception ! =null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if(isOpen()) { close(voidPromise()); }}}finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
Copy the code
The first call we see from the method is int localRead = doReadMessages(readBuf); , we need to focus on the read method, this method only in AbstractNioMessageChannle abstract method is given, and then I can be directly to find its subclasses NioServerSocketChannel, as follows:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel()); // Get the SocketChannel object in the JDK from the connection
try {
if(ch ! =null) {
buf.add(new NioSocketChannel(this, ch)); // encapsulate the SocketChannel object in the JDK as the NioSocketChannel used by netty
return 1; }}catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2); }}return 0;
}
Copy the code
As you can see, the only thing this method does is transition the NIO in the JDK to Netty, encapsulating it as a NIO SocketChannel, and this is the first time that it involves a client channel, which was all around nio ServerSocketChannel.
Now that a new connection has been acquired, you can iterate over the connection to propagate the read inbound event as follows:
for (int i = 0; i < size; i ++) { // Iterate over the new connections read
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); // Start the responsibility chain call, scattering the read inbound event
}
Copy the code
ServerBootstrapAcceptor (); ServerBootstrapAcceptor ();
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg; MSG is the new SocketChannel received by accept
child.pipeline().addLast(childHandler); // Add a new handler. This handler is the ChannelInitializer processor configured for our server startup
setChannelOptions(child, childOptions, logger); // Channel Settings
setAttributes(child, childAttrs);
try { // Channel register, select a nioEventLoop from subEventLoopGroup to handle IO operation, bind channel to EventLoop, trigger register event and active event, then automatically register OP_READ(same as main)
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}Copy the code
This method is used to distribute, and main receives the new connection (accept) and distributes it to sub to execute the subsequent logic (read).
By now, the handler in the server pipeline should understand how the server receives new connections and how they are distributed once they are received. Should a read event be polled by the EventLoop in childGroup? Let’s see
Analyze the handling of read inbound events
With the analysis of the above three events, we should summarize, what is the criterion of the whole link analysis?
- What event is this?
- What handlers are currently in the pipeline at the time the event is executed or dispatched?
- Which ones are triggered?
- What is the order of execution?
- What handlers are left after the event is executed, and are there any additions or deletions?
Unsafe.read (), the unsafe.read() event, the unsafe.read() event, the Eventloop in sub, uses AbstractNioByteChann to initialize itself El, which calls its read method, looks like this:
@Override
public final void read(a) { // The client sends a request to read the specific data
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator); // Request a byte buffer
allocHandle.lastBytesRead(doReadBytes(byteBuf)); // Read specific data into the byte buffer
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); // Distribute the read contents to the responsibility chain for processing by the handler
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if(close) { closeOnRead(pipeline); }}catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if(! readPending && ! config.isAutoRead()) { removeReadOp(); }}}}Copy the code
Like the other read method, the other one is for reading the connection and this is for reading the content, and then the chain of responsibility takes care of the specific data it reads.
We now know that it is an inbound read event, and that the pipeline has three handlers (read the registration logic at the end of the Accept event if you want to know exactly when it was added). , respectively is HeadContext EchoServerHandler (custom) and TailContext, these three processors will be triggered, is performed by the HeadContext started, so we only need to pay attention to the custom EchoServerHandler went, as follows :
package io.netty.example.echo;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/** * Handler implementation for the echo server. */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.cause.printStackTrace(); ctx.close(); }}Copy the code
Write back when the read event is triggered, which triggers the write outbound event.
Custom processor
Now that we understand the call flow, we can customize three processors to execute the business logic in practice:
- Protocol decoder: Converts byte arrays into Java objects for processing
- Business logic processor: Processes Java objects
- Protocol encoder: Returns Java objects encoded as arrays of bytes
Web address: http://175.24.172.160:3000/#/netty/responsibilityChainDesignMode