Interpretation of the Pipeline
Let’s look at the interface design of Pipeline first:
The Invoker actually specifies the outbound and inbound events that can be passed to the pipeline. The Pipeline itself is like a container, which provides many methods to add, delete, change and check the stored elements in the container, as well as some modifications to the Invoker inherited methods (return value).
For ChannelPipeline, it provides a number of methods to add, delete, change and check the ChannelHandlerContext. In fact, we also know from the start process, the pipeline is not stored in the handler itself, but the handler encapsulation. DefaultChannelHandlerContext, it is the real key
It also overwrites the methods inherited from Invoker, changing the return value to ChannelPipeline for our convenience
/** We rewrite the method we get from Invoker and modify the return value to make it easier to use
**/
@Override
ChannelPipeline fireChannelRegistered(a);
@Override
ChannelPipeline fireChannelUnregistered(a);
@Override
ChannelPipeline fireChannelActive(a);
@Override
ChannelPipeline fireChannelInactive(a);
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete(a);
@Override
ChannelPipeline fireChannelWritabilityChanged(a);
@Override
ChannelPipeline flush(a);
Copy the code
So, now look at the handlers stored in the pipe
ChannelHandler
// The top-level interface defines two callback methods related to handler addition and removal
// The callbacks to these two methods are actually performed by the EventLoop corresponding to the Channel.
public interface ChannelHandler {
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
// This method can be used to do some operations related to the release of resources
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*
* @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
* implement the method there.
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
/**
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*/
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value}}Copy the code
Its encapsulation corresponds to the top-level interface
AbstractChannelHandlerContext
attribute
// We focus on the propagation mechanism of events in pipeline
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext.ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
/ / after flooding
volatile AbstractChannelHandlerContext next;
/ / precursor
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/ * * * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
// Add wait state - handlerAdded has not been executed, possibly because the Channel has not been initialized
private static final int ADD_PENDING = 1;
/ * * * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
// It has been added to the pipe - the corresponding handlerAdded method has been executed
private static final int ADD_COMPLETE = 2;
/ * * * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
// Removed from pipeline - The corresponding handlerRemoved has been removed
private static final int REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
// This is also the default state - the newly created handler
private static final int INIT = 0;
// Outer pipe container
private final DefaultChannelPipeline pipeline;
CTX is generated by default when added to pipeline
private final String name;
// Usually true
private final boolean ordered;
// A mask value associated with the method overridden by handler
private final int executionMask;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
// If the handler operation takes a long time, you can specify the group to execute
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Tasks invokeTasks;
// The default is INIT
private volatile int handlerState = INIT;
Copy the code
As can be seen from the attributes, the storage in pipeline is actually a bidirectional linked list, corresponding to next and PREV
For CTX, there are several states periodically: INIT, ADD_PENDING, ADD_COMPLETE, and REMOVE_COMPLETE
If a channel is not yet registered, its executor is null, and the CTX is ADD_PENDING. If a channel is already registered, The corresponding state is ADD_COMPLETE, and the CTX state is REMOVE_COMPLETE when the handler is removed or the channel is closed
structure
In fact, we have been curious about how to transmit the event in pipeline, not so urgent, how to slowly expose its mask
How does CTX find the next CTX that can take care of the event when it propagates? Does this actually involve some kind of identifier?
// Pipeline outer container, containing CTX container
// Execute NioEventLoop for handels that take a long time to execute
// The name of the handle in pipeline
// Parameter 4: the processor class that actually handles the business
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
// Handler specifies the actual class
// The mask method is used to calculate a mask. The mask is used to find the next suitable CTX when CTX is passed backwards
// Return value - The final mask is recorded in the handler to override the method, which identifies its corresponding mask
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
// Usually true
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
Copy the code
The mask() method is key because the return value is assigned to executionMask, which is an execution-related mask identifier, as we know from the literal
The argument passes in the current handlerClass. In fact, by now, we know that the underlying implementation, whenever we design class-related, will inevitably use reflection. So where is reflection here?
# ChannelHandlerMask.mask()
A new class is involved: ChannelHandlerMask
// Handler specifies the actual class
// The mask method is used to calculate a mask. The mask is used to find the next suitable CTX when CTX is passed backwards
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) { // We consider this branch - no caching
// Return value - The final mask is recorded in the handler to override the method, which identifies its corresponding mask
mask = mask0(clazz);
cache.put(clazz, mask); // Cache to improve program performance
}
return mask;
}
Copy the code
// Handler specifies the actual class
// Return value - The final mask is recorded in the handler to override the method, which identifies its corresponding mask
private static int mask0(Class<? extends ChannelHandler> handlerType) {
// 0b 0000 0000 0000 0000 0000 0000 0000 0001
int mask = MASK_EXCEPTION_CAUGHT;
try {
// Conditional - The current handler is a subclass of ChannelInboundHandler
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
// 0b 0000 0000 0000 0000 0000 0001 1111 1111
mask |= MASK_ALL_INBOUND;
// Handler class
// Parameter two: method name
/ / parameters of the three: ChannelHandlerContext. Class
// isSkippable - Check whether the handler has overridden the specified method - there is no @skip annotation on the overridden method! - only overridden methods make sense!
// true - Indicates that the current handler has not overridden the specified method
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
// 0b 0000 0000 0000 0000 0000 0001 1111 1111
/ / &
// 0b 1111 1111 1111 1111 1111 1111 1111 1101
// -> 0000 0000 0000 0000 0000 0001 1111 1101
mask &= ~MASK_CHANNEL_REGISTERED; // Remove the identity mask from the mask
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) { mask &= ~MASK_USER_EVENT_TRIGGERED; }}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) { mask &= ~MASK_FLUSH; }}if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) { mask &= ~MASK_EXCEPTION_CAUGHT; }}catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask; // The final mask is recorded as the method to be overridden by the handler, which identifies its corresponding mask
}
Copy the code
As you can see, the mask is computed in the mask0() method, the key being the isSkippable() method call
// Handler class
// Parameter two: method name
/ / parameters of the three: ChannelHandlerContext. Class
@SuppressWarnings("rawtypes")
private static boolean isSkippable(
finalClass<? > handlerType,final String methodName, finalClass<? >... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run(a) throws Exception {
Method m;
try {
// How to get execution parameters by reflection? - Interconnect with the ChannelInboundHandler method
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
// Find the specified method
/ / condition 2: the current handler is inherited ChannelHandlerInboundAdapter, and did not go to rewrite method corresponding method
returnm ! =null&& m.isAnnotationPresent(Skip.class); }}); }Copy the code
The isSkippable() method matches the passed class with method names, paraTypes reflection, to determine if the handler is handling the method we are interested in, and if so, needs to be handled
Of course, the parent class of the unoverwritten method is only passed to the next CTX, while the overwritten method is obviously the implementation of the business logic we care about
How do you tell if it’s rewritten? Check if there is a corresponding annotation @Skip, as we can see in the parent class there will be an @Skip annotation
For example, ChannelInboundHandlerAdapter
@Skip // If a subclass overrides the method, the annotation disappears
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
Copy the code
So, we can check whether the target method is rewritten one by one through this reflection. If not, we need to modify the mask, that is, the final mask will represent the method implemented by the handler encapsulated in the current CTX
Each target method corresponds to a unique identity mask, as specified in the properties
// This is a helper class
final class ChannelHandlerMask {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelHandlerMask.class);
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
// Calculate the related inbound mask: 0000 0000 0001 1111 1110
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
// calculate the mask associated with inbound :(including MASK_EXCEPTION_CAUGHT) 0000 0000 0001 1111 1111
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
// Calculate the mask associated with outbound: 1111 1110 0000 0000
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
// calculate the mask associated with the outbound :(including MASK_EXCEPTION_CAUGHT) 1 1111 1110 0000 0001
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32); }};Copy the code
But, now let’s look at the CTX real types, namely DefaultChannelHandlerContext
DefaultChannelHandlerContext
// Implement handler() - so CTX is fully functional!
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
// The handler is enclosed internally
private final ChannelHandler handler;
// Pipeline outer container, containing CTX container
// Execute NioEventLoop for handels that take a long time to execute
// The name of the handle in pipeline
// Parameter 4: the processor that actually handles the business
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass()); / / as you can see, this gave AbstractChannelHandlerContext indirectly expose handler
this.handler = handler;
}
@Override
public ChannelHandler handler(a) {
returnhandler; }}Copy the code
As you can see, it encapsulates a handler, and now we have a pretty good idea of what CTX does
DefaultChannelPipeline
This is the pipeline we are looking at
addLast
We are curious about some of the logic added to the pipeline with the handler
// Parameters: business-level processors - this is the most commonly used addition method
public final ChannelPipeline addLast(ChannelHandler handler) {
// In this case, name-null
return addLast(null, handler);
}
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
// Parameter 1: group-null
// Parameter two: name - does not explicitly specify that this will also be null
// Parameter 3: service-layer handler
return addLast(null, name, handler);
}
// parameter 1: group-null - If not specified, this will be null
// Parameter two: name - does not explicitly specify that this will also be null
// Parameter 3: service-layer handler
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
/ / CTX bridge
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// Set the added attribute to true to indicate that the handler has been added to a pipeline
checkMultiplicity(handler);
/ / this step becomes DefaultChannelHandlerContext ChannelInitializer encapsulation
// Parameter 1: event executor group, usually null if not specified
FilterName (name, handler) if name == null, specify a name for CTX
// Generated name: handlerType + "#" + number (this number is used to prevent naming duplicates - increment)
// Parameter three: current handler
newCtx = newContext(group, filterName(name, handler), handler);
// Add CTX to pipeline!
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// true - The owning channel of pipeline is not registered, and the corresponding NioEventLoop has not been bound, that is, executor = null
// This is used to prevent null pointer exceptions
if(! registered) {// For ChannelInitializer, the channel is not registered at this time
newCtx.setAddPending(); // Set the handler state to ADD_PENDING, but the handlerAdd method is not called yet
// This step encapsulates the current CTX as a task and then adds it to a global unidirectional list
callHandlerCallbackLater(newCtx, true); / / to channel after registration, corresponding executor to perform (see invokeHandlerAddedIfNeeded ())
return this; // so you can return it!
}
// At this point, the channel is already registered
EventExecutor executor = newCtx.executor();
if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this; }}// Call the handleAdded method of the CTX handler
callHandlerAdded0(newCtx);
return this;
}
Copy the code
As you can see, newCtx is initialized in newContex(filterName generates a name for CTX, and the generated name is handlerType + “#” + number (where the number is used to prevent naming duplicating-increment)
// Parameter 1: event executor group, usually null if not specified
FilterName (name, handler) if name == null, specify a name for CTX
// Generated name: handlerType + "#" + number (this number is used to prevent naming duplicates - increment)
// Parameter three: current handler
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
Copy the code
Then add it to pipeline, corresponding to addLast0()
// addLast adds a handler to the front of the tail
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
Copy the code
After the ChannelInitializer is added, the state is changed and the handlerAdd() method is triggered, but the channel is not registered yet. So, a globally maintained one-way list is used
Corresponding:
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// true - The owning channel of pipeline is not registered, and the corresponding NioEventLoop has not been bound, that is, executor = null
// This is used to prevent null pointer exceptions
if(! registered) {// For ChannelInitializer, the channel is not registered at this time
newCtx.setAddPending(); // Set the handler state to ADD_PENDING, but the handlerAdd method is not called yet
// This step encapsulates the current CTX as a task and then adds it to a global unidirectional list
callHandlerCallbackLater(newCtx, true); / / to channel after registration, corresponding executor to perform (see invokeHandlerAddedIfNeeded ())
return this; // so you can return it!
}
Copy the code
If the channel is not registered, the state of CTX is set to ADD_PENDING, and the current CTX is wrapped as a task and added to the one-way linked list
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert! registered;This step encapsulates the ChannelHandlerContext encapsulated by ChannelInitializer - PendingHandlerAddedTask
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
// the list is queued
if (pending == null) {
pendingHandlerCallbackHead = task; / / as you can see, this step pendingHandlerCallbackHead stored in the layer is actually ChannelInitializer housed by the most
} else {
// Find the tail of the linked-list.
while(pending.next ! =null) { pending = pending.next; } pending.next = task; }}Copy the code
The purpose is to obtain and take out the tasks in the linked list to execute the corresponding handlerAdded method one by one after the channel is registered, and modify its corresponding status to handlerAdded
Corresponding implementation method is # DefaultChanelPipeline callHandlerAddedForAllHandlers, when the channel after the registration, submit an asynchronous task, in asynchronous tasks to traverse the singly linked list
Such as:
// Be aware that this method is executed by the EventLoop thread
private void callHandlerAddedForAllHandlers(a) {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert! registered;// This Channel itself was registered.
registered = true;
// This is a one-way list of stored tasks - start the analysis on the server and get the most wrapped tasks of ChannelInitializer
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// Iterate over the unidirectional list, execute the result
while(task ! =null) { // The unidirectional list stores tasks to be executed. For example, if a channel is not registered, CTX needs to be encapsulated as a task and added to the global unidirectional list for exector to executetask.execute(); task = task.next; }}Copy the code
So, when CTX is encapsulated as a task and stored in the global one-way linked list, it can return in the direction, because the corresponding channel will pay attention to traversing the one-way linked list to execute the corresponding task after registration
A specific task is actually related to its corresponding instance, so let’s consider the case of pendingHandle Eradded Task
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run(a) {
callHandlerAdded0(ctx);
}
@Override
void execute(a) {
// Get the EventLoop in this step
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } atomicRemoveFromHandlerList(ctx); ctx.setRemoved(); }}}}Copy the code
Obviously, callHandlerAdded0() will be executed
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded(); / / the execution
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: "+ ctx.name(), t2); }}if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t)); }}}Copy the code
callHandlerAdded
final void callHandlerAdded(a) throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
// Update the current CTX status to ADD_COMPLETE
if (setAddComplete()) {
// From a server startup perspective, the ChannelInitializer is obtained by handler()
// Execute the handlerAdded method
handler().handlerAdded(this); }}// Change the status
final boolean setAddComplete(a) {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true; }}}Copy the code
When executor executes a task, CAS changes the status of CTX to ADD_COMPLETE, and then executes the HandlerAdded() of its enclosed handler.
So, the add logic phase of CTX is complete!
Let’s look at how events are propagated in the pipeline.
Since most of the code in pipeline is template code, fireChannelRegistered() is used as an example
fireChannelRegistered
To be clear, calling pipeline’s fire methods, such as inbound events, will start from HeadContext, which is different from calling CTX’s fireXXX. Therefore, We’ve always said that by calling the fireXXX method of CTX you can do a bit of short-circuiting to filter out the uninterested CTX
@Override
public ChannelHandlerContext fireChannelRegistered(a) {
// findContextInbound(MASK_CHANNEL_REGISTERED) parameter: mask associated with ChannelRegister()
// It is not hard to imagine that it is through this mask value to find the appropriate CTX of interest to process
// fireContextInbound - find the CTX that implements the target method
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered(); / / first performed is actually HeadContext. InvokeChannelRegistered
} else {
executor.execute(new Runnable() {
@Override
public void run(a) { next.invokeChannelRegistered(); }}); }}private void invokeChannelRegistered(a) {
// Check handler status
if (invokeHandler()) {
try {
// Call handle.channelregistered ()
((ChannelInboundHandler) handler()).channelRegistered(this); / / to perform for the first time is actually HeadContext channelRegistered
} catch(Throwable t) { invokeExceptionCaught(t); }}else{ fireChannelRegistered(); }}Copy the code
When the handler state is valid, the handler method is executed
The state is valid to determine if the CTX is ADD_COMPLETE, which is logical because the internal methods will only be called if the CTX has been added successfully (the channel has been registered)
private boolean invokeHandler(a) {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
// The handler is queued
returnhandlerState == ADD_COMPLETE || (! ordered && handlerState == ADD_PENDING); }Copy the code
So, at this time we came to HeadContext channelRegistered ()
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
// Since firstRegistration == false, this step actually does nothing
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered(); // In the case of server startup demo, this is actually passed to LoggingHandler
}
Copy the code
Then, come to AbstractChannelHandlerContext fireChannelRegistered
@Override
public ChannelHandlerContext fireChannelRegistered(a) {
// findContextInbound(MASK_CHANNEL_REGISTERED) parameter: mask associated with ChannelRegister()
// It is not hard to imagine that it is through this mask value to find the appropriate CTX of interest to process
// fireContextInbound - find the CTX that implements the target method
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
Copy the code
findContextInbound()
// Parameter: the mask associated with the target method
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
// Find the appropriate CTX
do {
ctx = ctx.next; // Iterate over the next CTX
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); // Return true - to indicate that the current CTX is not the one we want to find and skip
return ctx;
}
Copy the code
By looping through the subsequent CTX in the pipe, what are the conditions?
skipContext()
// Parameter one: current CTX
// Parameter two: executes CTX event poller
// Parameter three: the mask value associated with the target method
// Parameter 4: MASK_ONLY_INBOUND
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
/ / can be understood as a big judgment (screening effect) | | real logic CTX. ExecutionMask & Mask = = 0
// Return true - to indicate that the current CTX is not the one we want to find and skip
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
// We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
// everything to preserve ordering.
//
// See https://github.com/netty/netty/issues/10067
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
Copy the code
MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED = MASK_CHANNEL_REGISTERED Return true, false if included, and end the loop, followed by the outer method: invokeChannelRegistered
So, this method is to execute the corresponding method in CTX
Doesn’t that implement event passing?
So, we need to pay attention to one point here: we added the handler encapsulated in CTX, if the override method is not explicitly calledsuper.channelXXX
Or is itctx.fireChannelXXX
Method, the event is actually cut off from the handler, that is, no further propagation
In particular, this is important for write events, which may involve data eventually being sent out to the client!
So, at this point, the interpretation of pipeline is finished!