preface

This chapter describes the Netty startup process, including the client and server.

  • Understand Future and Promise
  • Server startup: create Channel, initialize Channel, register Channel, bind
  • Client startup: create Channel, initialize Channel, register Channel, connect
  • Push event propagation: read/bind on the server, connect on the client

Future and Promise

Let’s start by reviewing JUC’s Future. Typically we submit Callable to Executor to get the JUC’s Future, which can be used to get the execution results of asynchronous tasks.

public interface Future<V> {
    // Cancel the task
    // mayInterruptIfRunning=true to interrupt the task in progress; Otherwise, unexecuted tasks can only be cancelled
    boolean cancel(boolean mayInterruptIfRunning);
    // Whether the task is cancelled before normal execution is complete
    boolean isCancelled(a);
    // Returns whether the task is finished, including normal termination, abnormal termination, and cancellation.
    boolean isDone(a);
    // Wait for the task to complete and return result V. If an exception occurs during task execution, an ExecutionException will be thrown
    V get(a) throws InterruptedException, ExecutionException;
    // Wait for the task to complete and return result V, TimeoutException is thrown
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code

Netty’s Future inherits JUC’s Future. It mainly extends the following points:

  • The isDone of JUC can only represent the final state of an asynchronous task, but the client does not know why the task ended. So Netty’s Future distinguishes between isDone subdivisions: success, exception, and cancellation.
  • Similar to JUC’s get method, sync and wait methods are provided to return the current Future, and other things can be done based on the Future’s success/exception/cancellation.
  • Add listeners to listen for Future completion events.
  • Added the getNow method to support non-blocking access to execution results.
public interface Future<V> extends java.util.concurrent.Future<V> {
    // isDone is true and the execution is successful
    boolean isSuccess(a);
    // isDone is true and is a canceled scenario
    boolean isCancellable(a);
    If sDone is false, null is returned
    // Otherwise, return an exception during execution
    Throwable cause(a);
		// Add listeners for the current future
    // If the future is not finished, the Listener will be notified when future.isDone=true
    // If the future has finished executing by the time it is called, the Listener is notified immediately
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
		// Delete the listener
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // await task completion, unlike JDK get, does not throw a task execution exception
		Future<V> await(a) throws InterruptedException;
    Future<V> awaitUninterruptibly(a);
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeoutMillis);
    // Wait for the task to complete. If an exception occurs, a GET similar to JDK Future will be thrown
    // Wait only waits for the task to complete and does not actively throw an exception
    Future<V> sync(a) throws InterruptedException;
    The difference with sync is that it does not raise interrupt exceptions. Instead, it sets the Thread interrupt bit to true and eats the interrupt exceptions
    Future<V> syncUninterruptibly(a);
    // Return the execution result immediately, or null if the future has not finished executing
    // Do not rely on getNow=null to check whether the task is complete or use isDone to check whether the task is complete
    V getNow(a);
}
Copy the code

The Future is caller-oriented for retrieving asynchronous execution results; Promises are for method implementers and are used to set the outcome of task execution. Typically, when a Promise sets the result, the Future’s state changes, waking up the method caller blocking the wait.

Netty’s Promise, which inherits the Future, is responsible for setting asynchronous tasks to succeed or fail.

public interface Promise<V> extends Future<V> {
    // Mark the future as successful, and throw an IllegalStateException if the mark fails
    Promise<V> setSuccess(V result);
    // Mark future success, which returns true
    boolean trySuccess(V result);
    // Mark the future as failed, and an IllegalStateException will be thrown if the mark fails
    Promise<V> setFailure(Throwable cause);
    // Mark future failed, success returns true
    boolean tryFailure(Throwable cause);
    // Set future to uncancelable
    boolean setUncancellable(a);
}
Copy the code

ChannelFuture is the Future associated with the Channel instance, and the Future task returns Void.

public interface ChannelFuture extends Future<Void> {
    /** * Returns a channel where the I/O operation associated with this * future takes place. */
    Channel channel(a);
}
Copy the code

ChannelPromise is a Promise associated with a Channel instance, and the task returns Void.

public interface ChannelPromise extends ChannelFuture.Promise<Void> {
    @Override
    Channel channel(a);
    // a shortcut to setSuccess(null)
    ChannelPromise setSuccess(a);
    // A shortcut to trySuccess(null)
    boolean trySuccess(a);
}
Copy the code

Let’s look at the default implementation of Future and Promise.

AbstractFuture gets methods that implement JUC futures are actually await methods that delegate NettyFuture, but are finally adapted to JDK interface implementations. By implication, the await of NettyFuture is approximately equal to the GET of JDKFuture.

public abstract class AbstractFuture<V> implements Future<V> {
    @Override
    public V get(a) throws InterruptedException, ExecutionException {
        await();
        Throwable cause = cause();
        if (cause == null) {
            return getNow();
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (await(timeout, unit)) {
            Throwable cause = cause();
            if (cause == null) {
                return getNow();
            }
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            throw new ExecutionException(cause);
        }
        throw newTimeoutException(); }}Copy the code

DefaultPromise is the skeleton implementation of Netty’s Future and Promise.

The member variables are as follows:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    // result atomic update
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // Task execution result
    private volatile Object result;
    / / the corresponding EventLoop
    private final EventExecutor executor;
    /**
     * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
     * If {@codenull}, it means either 1) no listeners were added yet or 2) all listeners were notified. * * Threading - synchronized(this). We  must support adding listeners when there is no EventExecutor. */
    / / GenericFutureListener or DefaultFutureListeners
    private Object listeners;
    /** * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). * /
    // Number of threads waiting for synchronized(this) locks
    private short waiters;

    /** * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the * executor changes. */
    // Whether all listeners have been notified
    private boolean notifyingListeners;
}
Copy the code

Let’s look at some representative methods.

The first is the abstract method await of the Future interface. Note that the checkDeadLock method here is defensive programming. The provider of the framework does not know whether the caller understands the framework or not, so it is better to actively detect to improve the robustness of the program in case the caller calls the await method incorrectly. This also means that the await method will only be invoked in the EventLoop thread corresponding to the non-Promise, and the wake operation will only be invoked in the EventLoop.

@Override
public Promise<V> await(a) throws InterruptedException {
    // Determine if the future is completed
    if (isDone()) {
        return this;
    }
    If the current thread is interrupted, InterruptedException is thrown
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    // Verify that the thread calling the await method (the current thread) is the corresponding EventLoop thread
    // If so, throw an exception, otherwise other threads will wait on the Future
    checkDeadLock();
    // object. wait for the EventLoop thread to wake up
    synchronized (this) {
        while(! isDone()) { incWaiters();// waiters++
            try {
                wait();
            } finally {
                decWaiters(); // waiters--}}}return this;
}

protected void checkDeadLock(a) {
  EventExecutor e = executor();
  if(e ! =null && e.inEventLoop()) {
    throw newBlockingOperationException(toString()); }}Copy the code

Await the await method with timeout. The underlying await method just loops through the JDK’s wait(timout) method. Let’s look at the implementation of the sync method. The sync method also blocks and waits by calling the await method, the only difference being that an exception is thrown if the Future terminates due to an exception.

@Override
public Promise<V> sync(a) throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
}
private void rethrowIfFailed(a) {
  Throwable cause = cause();
  if (cause == null) {
    return;
  }
  PlatformDependent.throwException(cause);
}
Copy the code

The getNow method returns the result variable directly, but null for special cases.

@Override
public V getNow(a) {
    Object result = this.result;
    // exception/success but V is void/cannot be cancelled
    if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
        return null;
    }
    return (V) result;
}
Copy the code

Then look at the implementation of the Promise abstract method, setSuccess.

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
  // If result = null, set it to SUCCESS constant Object, echoing getNow above
  return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
  // cas sets this.result = objectResult
  if (RESULT_UPDATER.compareAndSet(this.null, objResult) ||
      RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
    // notifyAll
    if (checkNotifyWaiters()) {
      // Notice all listeners
      notifyListeners();
    }
    return true;
  }
  return false;
}
// If there are threads waiting for the Future, wake them all up
private synchronized boolean checkNotifyWaiters(a) {
  if (waiters > 0) {
    notifyAll();
  }
  returnlisteners ! =null;
}
// Notice all listeners
private void notifyListenersNow(a) {
  Object listeners;
  // Modify the notifyingListeners flag and obtain all notices to be made
  synchronized (this) {
    if (notifyingListeners || this.listeners == null) {
      return;
    }
    notifyingListeners = true;
    listeners = this.listeners;
    this.listeners = null;
  }
  / / notify the listeners
  for (;;) {
    if (listeners instanceof DefaultFutureListeners) {
      // All listeners in the DefaultFutureListeners loop are notified
      notifyListeners0((DefaultFutureListeners) listeners);
    } else {
      // If it is GenericFutureListener, notify the Listener directly
      notifyListener0(this, (GenericFutureListener<? >) listeners); }// After each notice is completed, check again for new listeners to notice, and continue the cycle if any
    synchronized (this) {
      if (this.listeners == null) {
        notifyingListeners = false;
        return;
      }
      listeners = this.listeners;
      this.listeners = null; }}}// Notice of multiple listeners
private void notifyListeners0(DefaultFutureListeners listeners) { GenericFutureListener<? >[] a = listeners.listeners();int size = listeners.size();
  for (int i = 0; i < size; i ++) {
    notifyListener0(this, a[i]); }}// Notify a single listener
private static void notifyListener0(Future future, GenericFutureListener l) {
  try {
    l.operationComplete(future);
  } catch (Throwable t) {
  }
}
Copy the code

DefaultChannelPromise is a common default Future and Promise implementation that binds channels.

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise.FlushCheckpoint {
    private final Channel channel;
    public DefaultChannelPromise(Channel channel) {
        this.channel = checkNotNull(channel, "channel");
    }
    
    @Override
    public Channel channel(a) {
        returnchannel; }}Copy the code

2. Start the server

The Server startup entry is in the AbstractBootstrap#doBind method.

private ChannelFuture doBind(final SocketAddress localAddress) {
    Create and register a Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if(regFuture.cause() ! =null) {
        return regFuture;
    }
    // 2. Bind ports
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
    	// If the registration is not complete, asynchronous processing...}}Copy the code

Both the client and the server need to create/initialize/register a Channel, just of a different type. According to the BootStrap configuration, the Server is the NioServerSocketChannel and the Client is the NioSocketChannel.

The Channel initializes and registers the entry in AbstractBootstrap#initAndRegister.

// Build a factory for Channel instances. Create different Channel instances according to the Channel type
private volatile ChannelFactory<? extends C> channelFactory;
final ChannelFuture initAndRegister(a) {
    Channel channel = null;
    try {
        1. Create a Channel
        channel = channelFactory.newChannel();
        // 2. Initialize Channel
        init(channel);
    } catch (Throwable t) {
      // Failed to close channel
    }
    // 3. Register Channel
    ChannelFuture regFuture = config().group().register(channel);
    if(regFuture.cause() ! =null) {
        // Failed to close channel
    }
    return regFuture;
}
Copy the code

1. Create Channel

First ReflectiveChannelFactory will according to the type of the channel, through the reflection to create different instances of the channel, such as ServerBootStrap general configuration of the channel (NioServerSocketChannel. Class), This is where the NioServerSocketChannel instance is created.

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw newIllegalArgumentException(); }}@Override
    public T newChannel(a) {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw newChannelException(); }}}Copy the code

The NioServerSocketChannel construction method is not simple, the inheritance structure is very deep.

First NioServerSocketChannel creates the JDK’s ServerSocketChannel, passing the JDK’s channel and the concerned ACCEPT event into the parent class construct.

public NioServerSocketChannel(a) {
  this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 1. Create a JDK ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
  try {
    return provider.openServerSocketChannel();
  } catch (IOException e) {
    throw newChannelException(); }}public NioServerSocketChannel(ServerSocketChannel channel) {
    // 2. Call the parent class construct to pass in the JDK channel and ACCEPT event enumerations
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code

AbstractNioMessageChannel continue to upgrade passthrough.

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp); }}Copy the code

AbstractNioChannel saves JDK channels and concerns and sets the channel non-blocking.

public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    protected final int readInterestOp;
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
     	  // Set non-blocking
        ch.configureBlocking(false); }}Copy the code

Finally AbstractChannel created io.net ty. Channel. Channel. The Unsafe really responsible for the underlying communication, For NioServerSocketChannel founded by AbstractNioMessageChannel NioMessageUnsafe, DefaultChannelPipeline was created to propagate on and off the stack events and combine ChannelHandler lists.

protected AbstractChannel(Channel parent) {
  this.parent = parent;
  id = newId();
  unsafe = newUnsafe();
  pipeline = newChannelPipeline();
}
// AbstractNioMessageChannel -> NioMessageUnsafe
protected abstract AbstractUnsafe newUnsafe(a);

protected DefaultChannelPipeline newChannelPipeline(a) {
  return new DefaultChannelPipeline(this);
}
Copy the code

2. Initialize Channel

The init abstract methods of AbstractBootstrap are then called, implemented by ServerBootStrap and BootStrap respectively. ServerBootStrap initializes NioServerSocketChannel on the server.

@Override
void init(Channel channel) {
    // 1. Set ServerChannel option and attr
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    // 2. Set the ServerChannel handler
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    finalEntry<ChannelOption<? >, Object>[] currentChildOptions;synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            // Handler for ServerChannel
            ChannelHandler handler = config.handler();
            if(handler ! =null) {
                pipeline.addLast(handler);
            }
            // Add Netty's own ServerBootstrapAcceptor to handle connection events
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run(a) {
                    // When constructing ServerBootstrapAcceptor, pass in the child-related configuration
                    pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

Server initialization of NioServerSocketChannel does two things:

  • Put these configurations into the config and attr of the channel based on the configured options and attr.
  • In a channel Pipeline, add ChannelInitializer, the special ChannelHandler that triggers the initChannel hook after a channel is registered with a Selector. Netty’s own ServerBootstrapAcceptor is added to handle ACCEPT events.

When constructing ServerBootstrapAcceptor, Netty’s InboundChannelHandler passes in the configured childXXX. To handle the client channel connection, initialize and bind the Selector to the client channel.

Next, see how DefaultChannelPipeline’s addLast method converts ChannelHandler to ChannelHandlerContext to the Pipeline’s list.

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // Check whether non-thread-safe handlers are repeatedly added to different pipelines
        // @sharable annotation annotation handler is considered thread-safe and shareable
        checkMultiplicity(handler);

        / / create DefaultChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);

        // head -> newCtx -> tail
        addLast0(newCtx);

        / / when the channel didn't register to the selector, build a Task on pendingHandlerCallbackHead chain list first
        / / for the channel registration to the selector, perform ChannelHandler. HandlerAdded hook
        if(! registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx,true);
            return this;
        }
	/ / when the channel is registered, asynchronous execution ChannelHandler. HandlerAdded hook
        EventExecutor executor = newCtx.executor();
        if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this; }}To synchronize, / / channel has been registered to perform ChannelHandler. HandlerAdded hook
    callHandlerAdded0(newCtx);
    return this;
}
Copy the code

There are a few key points here.

1. @shareable annotation

By default, ChannelHandler cannot be added to multiple pipelines as singletons. Only when the client confirms that the ChannelHandler is thread safe can the client use the @shareable annotation. To allow singleton handlers to be added to different pipelines multiple times.

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        // The Sharable annotation can be added multiple times to different pipelines if the Handler is thread-safe
        if(! h.isSharable() && h.added) {throw new ChannelPipelineException();
        }
        h.added = true; }}Copy the code

2, ChannelHandlerContext

The second is the list structure in the Pipeline, which is connected by the ChannelHandlerContext, not the ChannelHandler.

When addLast no incoming EventLoopGroup specified by default created DefaultChannelHandlerContext will not directly binding EventLoop, if you want a Handler assigned a thread executing alone, AddLast can pass in an EventLoopGroup.

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// Select an EventLoop from the EventLoopGroup to bind to the Context
private EventExecutor childExecutor(EventExecutorGroup group) {
  if (group == null) {
    return null;
  }
  / /...
}
Copy the code

DefaultChannelHandlerContext inheritance AbstractChannelHandlerContext, nothing special, just realized the handler method to get with the current context ChannelHandler binding.

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler(a) {
        returnhandler; }}Copy the code

If Context is not directly bound to EventLoop, how does Netty serialize? In fact, if the Context is not bound to an EventLoop, the default is to take the EventLoop bound to channel. See AbstractChannelHandlerContext executor method returns EventLoop.

@Override
public EventExecutor executor(a) {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        returnexecutor; }}Copy the code

3. HeadContext and TailContext

In ChannelPipeline, in addition to the Context added by the user through the addLast method, the Context two-way list will have a HeadContext. The HeadContext propagates the loaded event and operates on the UNSAFE instance for IO operations.

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler.ChannelInboundHandler {
    private final Unsafe unsafe;
  @Override
  public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
  }

  @Override
  public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
  }
    @Override
    public void read(ChannelHandlerContext ctx) {
    	unsafe.beginRead();
    }
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    	unsafe.write(msg, promise);
    }
    @Override
    public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }}Copy the code

In addition, Context has a TailContext at the end of the bidirectional list, which is mainly used to propagate events out of the stack and recycle resources.

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, TailContext.class);
        setAddComplete();
    }
  
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(ctx, msg); }}/ / AbstractChannelHandlerContext out deal with untreated MSG, release resources
protected void onUnhandledInboundMessage(Object msg) {
  try {
    logger.debug(
      "Discarded inbound message {} that reached at the tail of the pipeline. " +
      "Please check your pipeline configuration.", msg);
  } finally{ ReferenceCountUtil.release(msg); }}Copy the code

4. When to trigger the channelAdded hook

The addLast method normally fires the channelAdded hook of the ChannelHandler, but during Channel initialization it does not fire the channelAdded hook until the Channel is registered with the Selector.

Therefore, the first scenario where the channelAdded hook is triggered is after the Channel registration is completed. In addLast method, a linked list of tasks will be constructed and all tasks will be executed after the Channel registration is completed.

// DefaultChannelPipeline
private PendingHandlerCallback pendingHandlerCallbackHead;
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    // Create a callback, add it to the list, and wait for the appropriate time to trigger
    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        while(pending.next ! =null) { pending = pending.next; } pending.next = task; }}private abstract static class PendingHandlerCallback implements Runnable {
  final AbstractChannelHandlerContext ctx;
  PendingHandlerCallback next;

  PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
    this.ctx = ctx;
  }

  abstract void execute(a);
}
Copy the code

In addition to the previous scenario, determine whether the current thread is the corresponding EventLoop thread, if so, execute the hook synchronously, otherwise submit to EventLoop to execute the hook.

3. Register Channel

The last step in the initAndRegister method is to register the initialized channel instance with a JDK selector.

final ChannelFuture initAndRegister(a) {
    // ...
    // 3. Register Channel
    ChannelFuture regFuture = config().group().register(channel);
    // ...
    return regFuture;
}
Copy the code

First of all, according to the configuration to obtain corresponding EventLoopGroup for Server side is MultithreadEventLoopGroup EventLoopGroup instance.

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
Copy the code

First, the Next method selects an EventLoop in the EventLoopGroup.

The channel is then registered with the SingleThreadEventLoop#register method, which wraps the channel into a DefaultChannelPromise and calls another overloaded register method, Notice that the EventLoop itself is bound to the Promise (the secret to Netty serialization is to bind the execution thread to the instance).

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
Copy the code

Ultimately, the Unsafe Register method is called to register the ChannelPromise.

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
Copy the code

The AbstractChannel inner class AbstractUnsafe register method is called, which is the barebones implementation of Unsafe. The two most important things to do here are to bind the EventLoop to the current Channel instance and the actual registration logic for register0.

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  // Duplicate registration is not allowed
  if(isRegistered()) { promise.setFailure(...) ;return;
  }
  // Check whether EventLoop is supported by Unsafe to prevent incorrect EventLoop being passed
  if(! isCompatible(eventLoop)) { promise.setFailure(...) );return;
  }

  [important] Bind channel and eventLoop
  AbstractChannel.this.eventLoop = eventLoop;

  if (eventLoop.inEventLoop()) {
    register0(promise);
  } else {
    // The server usually starts on the main thread, not the same thread as EventLoop, where register0 is called asynchronously
    try {
      eventLoop.execute(new Runnable() {
        @Override
        public void run(a) { register0(promise); }}); }catch (Throwable t) {
      // ...}}}Copy the code

NioEventLoop abstract parent class, SingleThreadEventExecutor will return to that task in the queue EventLoop subsequent understanding again.

// Default immediate=true
private void execute(Runnable task, boolean immediate) {
   // main ! = this.thread -> inEventLoop = false
    boolean inEventLoop = inEventLoop(); 
    // Add the task to the task queue
    addTask(task);
    if(! inEventLoop) {// If the current thread is not an EventLoop bound thread, try to start the EventLoop corresponding thread
        startThread();
        if (isShutdown()) {
          // ... }}/ / wake EventLoop
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}
Copy the code

Take a look at the AbstractUnsafe#register0 implementation.

private void register0(ChannelPromise promise) {
        if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
        }
        boolean firstRegistration = neverRegistered;
        // Register the Selector in the JDK
        doRegister();
        neverRegistered = false;
        registered = true;
        / / add pendingHandlerCallbackHead task trigger the init phase, the purpose is to perform the ChannelHandler channelAdded hook
        // This triggers the init method of ChannelInitializer
        pipeline.invokeHandlerAddedIfNeeded();

        Call promise's trySuccess method to wake up the thread blocking on the future
        safeSetSuccess(promise);
        // The channelRegistered event is pushed
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                // If it is the first registration, the channelActive event is pushed
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // For non-first-time registration, focus on READ events on selectorbeginRead(); }}}Copy the code

First, doRegister registers a Channel with a JDK Selector. DoRegister is an abstract method implemented by AbstractNioChannel.

@Override
protected void doRegister(a) throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
            return;
        } catch (CancelledKeyException e) {
            if(! selected) { eventLoop().selectNow(); selected =true;
            } else {
                throwe; }}}}Copy the code

Then, if it is registered for the first time, Pipeline# invokeHandlerAddedIfNeeded will trigger the init phase before the cache handlerAdded hook task (see 2-4 when to trigger channelAdded), The initChannel method of the configured ChannelInitializer is triggered to add the configured ChannelHandler to the ChannelPipeline.

After performing the above step, set the Promise result to success, wake up the threads blocking on the Future and fire any listeners.

Finally, the channelRegistered event is triggered. If the first registration is also triggered, the channelActive event is set to READ for non-first registration.

The channelActive event is ultimately set to READ. Here Pipeline propagates the pushed channelActive event from HeadContext.

final AbstractChannelHandlerContext head;
@Override
public final ChannelPipeline fireChannelActive(a) {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}
Copy the code

After propagating the channelActive event, HeadContext uses the readIfIsAUtoRead method, which by default fires the channel’s read method directly.

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    readIfIsAutoRead();
}
private void readIfIsAutoRead(a) {
  if(channel.config().isAutoRead()) { channel.read(); }}Copy the code

The read event is used to set the SelectionKey to read (ACCEPT).

AbstractChannel in turn propagates the read off the stack event in the Pipeline.

@Override
public Channel read(a) {
    pipeline.read();
    return this;
}
Copy the code

DefaultChannelPipeline passes the read event to TailContext for propagation.

@Override
public final ChannelPipeline read(a) {
    tail.read();
    return this;
}
Copy the code

The read event is propagated from tail to head, which calls the beginRead method on unsafe to change the concerned event to read.

// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
Copy the code

Finally, AbstractNioChannel’s doBeginRead method is called, and for NioServerSocketChannel, instead of focusing on the READ event, we’re focusing on the ACCEPT event. Register the concerned event with the JDK’s selectionKey.

OP_ACCEPT = 1 << 4
protected final int readInterestOp; 
@Override
protected void doBeginRead(a) throws Exception {
  // ...
  final int interestOps = selectionKey.interestOps();
  if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code

4. Bind ports

Go back to the AbstractBootstrap#doBind method. At this point initAndRegister may still be executing the Register method (the EventLoop thread is executing) or register may have finished executing and the doBind0 method will be called synchronously or asynchronously.

private ChannelFuture doBind(final SocketAddress localAddress) {
    Create and register a Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    // 2. Bind ports
    if (regFuture.isDone()) {
        // The register method is complete, and doBind0 is synchronized
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // doBind0 is executed asynchronously
       AddListener also validates isDone, which is also a synchronous operation.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); }});returnpromise; }}Copy the code

DoBind0 is serialized and still uses the EventLoop thread to perform the bind method for a Channel.

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run(a) {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code

Look at the outgoing link for the Bind outgoing stack event, just like the read outgoing stack event.

AbstractChannel#bind calls pipeline corresponding to channel to propagate the bind out of the stack event.

private final DefaultChannelPipeline pipeline;
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
Copy the code

DefaultChannelPipeline directly fires the bind method of TailContext.

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
Copy the code

Is actually perform abstract AbstractChannelHandlerContext bind method.

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // Find the next Handler to focus on the bind event
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    // Serialize the invokeBind method
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run(a) {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null.false);
    }
    return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
  ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
}
Copy the code

The bind event is propagated from TailContext all the way to HeadContext. For example, in the EchoServer case, we added two handlers. The LoggingHandler is responsible for printing logs, and the ServerBootstrapAcceptor is responsible for accessing client channels.

LoggingHandler looks at the bind event, so it gets executed.

@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    if (logger.isEnabled(internalLevel)) {
        logger.log(internalLevel, format(ctx, "BIND", localAddress));
    }
    // Propagate to the next Context
    ctx.bind(localAddress, promise);
}
Copy the code

The LoggingHandler finally calls context. bind, and the bind ejection event propagates further down the stack. The next Handler is ServerBootstrapAcceptor, but because ServerBootstrapAcceptor is a pure InboundHandler, it will not actually be executed.

For EchoServer, the real next Handler is HeadContext. And HeadContext is the operation Unsafe for low-level IO operations.

@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
Copy the code

AbstractChannel AbstractUnsafe call subclasses NioServerSocketChannel doBind method, the implementation of the JDK Channel binding port.

// AbstractChannel.AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    // Call the subclass doBind
     doBind(localAddress);
    if(! wasActive && isActive()) {// If initAndRegister did not trigger channelActive before, do so
        invokeLater(new Runnable() {
            @Override
            public void run(a) { pipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }// NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
  if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); }}Copy the code

3. Start the client

The client startup process is roughly the same as the server:

  • Create a Channel
  • Initialize the Channel
  • Registration of the Channel
  • Connect to server CONNECT and propagate the connect off stack event

Focus on a few points:

  • Different Channel types are created, NioSocketChannel constructors
  • Channels focus on different events. The server is ACCEPT and the client is READ
  • When initializing a Channel, the init method of BootStrap is used
  • Connect Out-of-stack event propagation

Bootstrap#doResolveAndConnect is the entry point for client startup.

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 1. Create, initialize, register Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
		// 2. connect
    if (regFuture.isDone()) {
        if(! regFuture.isSuccess()) {return regFuture;
        }
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
      / /... The Listener is registered and processed asynchronously}}Copy the code

1. Create Channel

NioSocketChannel has roughly the same inheritance as NioServerSocketChannel, except for the direct parent class AbstractNioByteChannel.

public NioSocketChannel(a) {
    this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}
// 1. Create a JDK SocketChannel
private static SocketChannel newSocket(SelectorProvider provider) {
  return provider.openSocketChannel();
}
// 2. Call the superclass constructor
public NioSocketChannel(Channel parent, SocketChannel socket) {
  super(parent, socket);
  config = new NioSocketChannelConfig(this, socket.socket());
}
Copy the code

AbstractNioByteChannel attention is READ, unlike AbstractNioMessageChannel is ACCEPT.

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}
Copy the code

2. Initialize Channel

The init method of BootStrap is as follows, which is relatively simple compared to ServerBootStrap. No special Handler is added and no childGroup configuration is involved. The focus is on ChannelPipeline’s addLast method, which starts on the Server side and has already been seen.

void init(Channel channel) {
    ChannelPipeline p = channel.pipeline();
    // Add the configured handler to ChannelPipeline
    p.addLast(config.handler());
    // The superclass method sets the option of a Channel
    setChannelOptions(channel, newOptionsArray(), logger);
    // The parent class method sets the attr of the Channel
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
Copy the code

3. Register Channel

The process for a client to register a NioSocketChannel to a Selector is the same as the process for a server NioServerSocketChannel.

After the registration is complete, the channelAdded hook is triggered, triggering the initChannel method of the configured ChannelInitializer.

4, the connect

Bootstrap finally calls doConnect, executing the connect method of Channel, which is the ChannelOutboundInvoker that triggers the out-of-stack event.

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
		// serialize EventLoop
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run(a) {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else{ channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}); }Copy the code

NioSocketChannel’s parent class AbstractChannel’s Connect method is called directly from DefaultChannelPipeline’s Connect method. DefaultChannelPipeline is also a ChannelOutboundInvoker that triggers an out of stack event.

// AbstractChannel
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, localAddress, promise);
}
Copy the code

Pipeline call TailContext trigger connect the stack events, TailContext AbstractChannelHandlerContext inheritance is a ChannelOutboundInvoker.

@Override
public final ChannelFuture connect( SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}
Copy the code

TailContext abstract superclass AbstractChannelHandlerContext spread the connect event to the next Handler, has spread to HeadContext the ChannelOutboundHandler.

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    // Find the next handler context execution that focuses on the CONNECT event
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
      // ...
    }
    return promise;
}

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
	((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}
Copy the code

The HeadContext call Unsafe performs the actual connect operation.

@Override
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
}
Copy the code

AbstractNioChannel. AbstractNioUnsafe# connect:

  • Call SocketChannel to perform a non-blocking connect
  • Resolve connection timeouts by submitting a deferred task to EventLoop
  • Add a Listener to the Future to handle asynchronous callbacks after the connection completes
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  boolean wasActive = isActive();
  Socketchannel.connect, which returns false if it is not blocked
  if (doConnect(remoteAddress, localAddress)) {
    fulfillConnectPromise(promise, wasActive);
  } else {
    connectPromise = promise;
    requestedRemoteAddress = remoteAddress;
     // 2. Handle connection timeout by submitting a delayed task to EventLoop. Default timeout is 30 seconds
    int connectTimeoutMillis = config().getConnectTimeoutMillis();
    if (connectTimeoutMillis > 0) {
      connectTimeoutFuture = eventLoop().schedule(new Runnable() {
        @Override
        public void run(a) {
          ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
          ConnectTimeoutException cause =
            new ConnectTimeoutException("connection timed out: " + remoteAddress);
          if(connectPromise ! =null && connectPromise.tryFailure(cause)) {
            close(voidPromise());
          }
        }
      }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    }
    // 3. Add a Listener to receive a callback when a non-blocking SocketChannel connection completes
    promise.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isCancelled()) {
          if(connectTimeoutFuture ! =null) {
            connectTimeoutFuture.cancel(false);
          }
          connectPromise = null; close(voidPromise()); }}}); }}Copy the code

NioSocketChannel# doConnect, perform the JDK SocketChannel the connect method, the structure of the AbstractNioChannel methods on the Server side start seen, will set up the Channel nonblocking, So the Channel focus event is set to CONNECT.

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
  // ...
  // Execute the JDK's SocketChannel connect method
  boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
  // If SocketChannel is not blocking, return false and set the concern event to CONNECT
  if(! connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); }return connected;
}
Copy the code

Instead of looking at how the EventLoop polls Selector. Select, we’re looking at the callback after the CONNECT event. The entry is NioUnsafe#finishConnect, and the implementation is AbstractNioUnsafe#finishConnect.

public final void finishConnect(a) {
  assert eventLoop(a).inEventLoop(a);

  try {
    boolean wasActive = isActive();
    / / 1. Calling JDK SocketChannel. FinishConnect
    doFinishConnect();
    // 2. Set the Promise result and trigger the ChannelActive push event
    fulfillConnectPromise(connectPromise, wasActive);
  } catch (Throwable t) {
    fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
  } finally {
    // 3. Cancel the connection timeout detection task submitted to EventLoop
    if(connectTimeoutFuture ! =null) {
      connectTimeoutFuture.cancel(false);
    }
    connectPromise = null; }}private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
  // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
  // We still need to ensure we call fireChannelActive() in this case.
  boolean active = isActive();
  // trySuccess() will return false if a user cancelled the connection attempt.
  boolean promiseSet = promise.trySuccess();
  // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
  // because what happened is what happened.
  if(! wasActive && active) { pipeline().fireChannelActive(); }// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
  if (!promiseSet) {
    close(voidPromise());
  }
}
Copy the code

conclusion

  • The Future is caller-oriented for retrieving asynchronous execution results; Promises are for method implementers and are used to set the outcome of task execution.
  • Server startup: create Channel, initialize Channel, register Channel, bind.
  • Client startup: create Channel, initialize Channel, register Channel, connect.
  • After a Channel is registered with a Selector, the ChannelAdded event is triggered and the initChannel method of ChannelInitializer is executed.
  • A ChannelHandler encapsulates a ChannelHandlerContext into the ChannelPipeline. A ChannelPipeline assembles a two-way linked list of ChannelHandlerContext, with HeadContext and TailContext as its head node. Out-of-stack events are propagated from TailContext to HeadContext, and in-stack events are propagated from HeadContext to TailContext.

  • The propagation mode of the out-of-stack event, for example: the read out-of-stack event triggered by the server after registering a Channel (set the concern event to ACCEPT), the server bind out-of-stack event, and the client connect out-of-stack event. The out-of-stack event is propagated ChannelOutboundInvoker->ChannelOutboundHandler->Unsafe.

    The key classes involved are as follows: Channel (ChannelOutboundInvoker) ->Pipeline (ChannelOutboundInvoker) ->TailContext (ChannelOutboundInvoker) ->ChannelOutboundH Andlers – > HeadContext (ChannelOutboundHandler) – > the Unsafe.

  • ChannelActive Trigger time: After bind is completed on the server and connect is completed on the client.

  • ChannelInitializer provides the following functions: After the server NioServerSocketChannel or client NioSocketChannel is registered, the initChannel method is triggered to add the Service ChannelHandler.