preface
This chapter describes the Netty startup process, including the client and server.
- Understand Future and Promise
- Server startup: create Channel, initialize Channel, register Channel, bind
- Client startup: create Channel, initialize Channel, register Channel, connect
- Push event propagation: read/bind on the server, connect on the client
Future and Promise
Let’s start by reviewing JUC’s Future. Typically we submit Callable to Executor to get the JUC’s Future, which can be used to get the execution results of asynchronous tasks.
public interface Future<V> {
// Cancel the task
// mayInterruptIfRunning=true to interrupt the task in progress; Otherwise, unexecuted tasks can only be cancelled
boolean cancel(boolean mayInterruptIfRunning);
// Whether the task is cancelled before normal execution is complete
boolean isCancelled(a);
// Returns whether the task is finished, including normal termination, abnormal termination, and cancellation.
boolean isDone(a);
// Wait for the task to complete and return result V. If an exception occurs during task execution, an ExecutionException will be thrown
V get(a) throws InterruptedException, ExecutionException;
// Wait for the task to complete and return result V, TimeoutException is thrown
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Copy the code
Netty’s Future inherits JUC’s Future. It mainly extends the following points:
- The isDone of JUC can only represent the final state of an asynchronous task, but the client does not know why the task ended. So Netty’s Future distinguishes between isDone subdivisions: success, exception, and cancellation.
- Similar to JUC’s get method, sync and wait methods are provided to return the current Future, and other things can be done based on the Future’s success/exception/cancellation.
- Add listeners to listen for Future completion events.
- Added the getNow method to support non-blocking access to execution results.
public interface Future<V> extends java.util.concurrent.Future<V> {
// isDone is true and the execution is successful
boolean isSuccess(a);
// isDone is true and is a canceled scenario
boolean isCancellable(a);
If sDone is false, null is returned
// Otherwise, return an exception during execution
Throwable cause(a);
// Add listeners for the current future
// If the future is not finished, the Listener will be notified when future.isDone=true
// If the future has finished executing by the time it is called, the Listener is notified immediately
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// Delete the listener
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// await task completion, unlike JDK get, does not throw a task execution exception
Future<V> await(a) throws InterruptedException;
Future<V> awaitUninterruptibly(a);
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeoutMillis);
// Wait for the task to complete. If an exception occurs, a GET similar to JDK Future will be thrown
// Wait only waits for the task to complete and does not actively throw an exception
Future<V> sync(a) throws InterruptedException;
The difference with sync is that it does not raise interrupt exceptions. Instead, it sets the Thread interrupt bit to true and eats the interrupt exceptions
Future<V> syncUninterruptibly(a);
// Return the execution result immediately, or null if the future has not finished executing
// Do not rely on getNow=null to check whether the task is complete or use isDone to check whether the task is complete
V getNow(a);
}
Copy the code
The Future is caller-oriented for retrieving asynchronous execution results; Promises are for method implementers and are used to set the outcome of task execution. Typically, when a Promise sets the result, the Future’s state changes, waking up the method caller blocking the wait.
Netty’s Promise, which inherits the Future, is responsible for setting asynchronous tasks to succeed or fail.
public interface Promise<V> extends Future<V> {
// Mark the future as successful, and throw an IllegalStateException if the mark fails
Promise<V> setSuccess(V result);
// Mark future success, which returns true
boolean trySuccess(V result);
// Mark the future as failed, and an IllegalStateException will be thrown if the mark fails
Promise<V> setFailure(Throwable cause);
// Mark future failed, success returns true
boolean tryFailure(Throwable cause);
// Set future to uncancelable
boolean setUncancellable(a);
}
Copy the code
ChannelFuture is the Future associated with the Channel instance, and the Future task returns Void.
public interface ChannelFuture extends Future<Void> {
/** * Returns a channel where the I/O operation associated with this * future takes place. */
Channel channel(a);
}
Copy the code
ChannelPromise is a Promise associated with a Channel instance, and the task returns Void.
public interface ChannelPromise extends ChannelFuture.Promise<Void> {
@Override
Channel channel(a);
// a shortcut to setSuccess(null)
ChannelPromise setSuccess(a);
// A shortcut to trySuccess(null)
boolean trySuccess(a);
}
Copy the code
Let’s look at the default implementation of Future and Promise.
AbstractFuture gets methods that implement JUC futures are actually await methods that delegate NettyFuture, but are finally adapted to JDK interface implementations. By implication, the await of NettyFuture is approximately equal to the GET of JDKFuture.
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get(a) throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw newTimeoutException(); }}Copy the code
DefaultPromise is the skeleton implementation of Netty’s Future and Promise.
The member variables are as follows:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// result atomic update
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// Task execution result
private volatile Object result;
/ / the corresponding EventLoop
private final EventExecutor executor;
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@codenull}, it means either 1) no listeners were added yet or 2) all listeners were notified. * * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor. */
/ / GenericFutureListener or DefaultFutureListeners
private Object listeners;
/** * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll(). * /
// Number of threads waiting for synchronized(this) locks
private short waiters;
/** * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the * executor changes. */
// Whether all listeners have been notified
private boolean notifyingListeners;
}
Copy the code
Let’s look at some representative methods.
The first is the abstract method await of the Future interface. Note that the checkDeadLock method here is defensive programming. The provider of the framework does not know whether the caller understands the framework or not, so it is better to actively detect to improve the robustness of the program in case the caller calls the await method incorrectly. This also means that the await method will only be invoked in the EventLoop thread corresponding to the non-Promise, and the wake operation will only be invoked in the EventLoop.
@Override
public Promise<V> await(a) throws InterruptedException {
// Determine if the future is completed
if (isDone()) {
return this;
}
If the current thread is interrupted, InterruptedException is thrown
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// Verify that the thread calling the await method (the current thread) is the corresponding EventLoop thread
// If so, throw an exception, otherwise other threads will wait on the Future
checkDeadLock();
// object. wait for the EventLoop thread to wake up
synchronized (this) {
while(! isDone()) { incWaiters();// waiters++
try {
wait();
} finally {
decWaiters(); // waiters--}}}return this;
}
protected void checkDeadLock(a) {
EventExecutor e = executor();
if(e ! =null && e.inEventLoop()) {
throw newBlockingOperationException(toString()); }}Copy the code
Await the await method with timeout. The underlying await method just loops through the JDK’s wait(timout) method. Let’s look at the implementation of the sync method. The sync method also blocks and waits by calling the await method, the only difference being that an exception is thrown if the Future terminates due to an exception.
@Override
public Promise<V> sync(a) throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed(a) {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
Copy the code
The getNow method returns the result variable directly, but null for special cases.
@Override
public V getNow(a) {
Object result = this.result;
// exception/success but V is void/cannot be cancelled
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}
Copy the code
Then look at the implementation of the Promise abstract method, setSuccess.
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// If result = null, set it to SUCCESS constant Object, echoing getNow above
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// cas sets this.result = objectResult
if (RESULT_UPDATER.compareAndSet(this.null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// notifyAll
if (checkNotifyWaiters()) {
// Notice all listeners
notifyListeners();
}
return true;
}
return false;
}
// If there are threads waiting for the Future, wake them all up
private synchronized boolean checkNotifyWaiters(a) {
if (waiters > 0) {
notifyAll();
}
returnlisteners ! =null;
}
// Notice all listeners
private void notifyListenersNow(a) {
Object listeners;
// Modify the notifyingListeners flag and obtain all notices to be made
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
/ / notify the listeners
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
// All listeners in the DefaultFutureListeners loop are notified
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// If it is GenericFutureListener, notify the Listener directly
notifyListener0(this, (GenericFutureListener<? >) listeners); }// After each notice is completed, check again for new listeners to notice, and continue the cycle if any
synchronized (this) {
if (this.listeners == null) {
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null; }}}// Notice of multiple listeners
private void notifyListeners0(DefaultFutureListeners listeners) { GenericFutureListener<? >[] a = listeners.listeners();int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]); }}// Notify a single listener
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
}
}
Copy the code
DefaultChannelPromise is a common default Future and Promise implementation that binds channels.
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise.FlushCheckpoint {
private final Channel channel;
public DefaultChannelPromise(Channel channel) {
this.channel = checkNotNull(channel, "channel");
}
@Override
public Channel channel(a) {
returnchannel; }}Copy the code
2. Start the server
The Server startup entry is in the AbstractBootstrap#doBind method.
private ChannelFuture doBind(final SocketAddress localAddress) {
Create and register a Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! =null) {
return regFuture;
}
// 2. Bind ports
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// If the registration is not complete, asynchronous processing...}}Copy the code
Both the client and the server need to create/initialize/register a Channel, just of a different type. According to the BootStrap configuration, the Server is the NioServerSocketChannel and the Client is the NioSocketChannel.
The Channel initializes and registers the entry in AbstractBootstrap#initAndRegister.
// Build a factory for Channel instances. Create different Channel instances according to the Channel type
private volatile ChannelFactory<? extends C> channelFactory;
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
1. Create a Channel
channel = channelFactory.newChannel();
// 2. Initialize Channel
init(channel);
} catch (Throwable t) {
// Failed to close channel
}
// 3. Register Channel
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
// Failed to close channel
}
return regFuture;
}
Copy the code
1. Create Channel
First ReflectiveChannelFactory will according to the type of the channel, through the reflection to create different instances of the channel, such as ServerBootStrap general configuration of the channel (NioServerSocketChannel. Class), This is where the NioServerSocketChannel instance is created.
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw newIllegalArgumentException(); }}@Override
public T newChannel(a) {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw newChannelException(); }}}Copy the code
The NioServerSocketChannel construction method is not simple, the inheritance structure is very deep.
First NioServerSocketChannel creates the JDK’s ServerSocketChannel, passing the JDK’s channel and the concerned ACCEPT event into the parent class construct.
public NioServerSocketChannel(a) {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// 1. Create a JDK ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw newChannelException(); }}public NioServerSocketChannel(ServerSocketChannel channel) {
// 2. Call the parent class construct to pass in the JDK channel and ACCEPT event enumerations
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code
AbstractNioMessageChannel continue to upgrade passthrough.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp); }}Copy the code
AbstractNioChannel saves JDK channels and concerns and sets the channel non-blocking.
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
// Set non-blocking
ch.configureBlocking(false); }}Copy the code
Finally AbstractChannel created io.net ty. Channel. Channel. The Unsafe really responsible for the underlying communication, For NioServerSocketChannel founded by AbstractNioMessageChannel NioMessageUnsafe, DefaultChannelPipeline was created to propagate on and off the stack events and combine ChannelHandler lists.
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
// AbstractNioMessageChannel -> NioMessageUnsafe
protected abstract AbstractUnsafe newUnsafe(a);
protected DefaultChannelPipeline newChannelPipeline(a) {
return new DefaultChannelPipeline(this);
}
Copy the code
2. Initialize Channel
The init abstract methods of AbstractBootstrap are then called, implemented by ServerBootStrap and BootStrap respectively. ServerBootStrap initializes NioServerSocketChannel on the server.
@Override
void init(Channel channel) {
// 1. Set ServerChannel option and attr
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// 2. Set the ServerChannel handler
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions;synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// Handler for ServerChannel
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
// Add Netty's own ServerBootstrapAcceptor to handle connection events
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
// When constructing ServerBootstrapAcceptor, pass in the child-related configuration
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
Server initialization of NioServerSocketChannel does two things:
- Put these configurations into the config and attr of the channel based on the configured options and attr.
- In a channel Pipeline, add ChannelInitializer, the special ChannelHandler that triggers the initChannel hook after a channel is registered with a Selector. Netty’s own ServerBootstrapAcceptor is added to handle ACCEPT events.
When constructing ServerBootstrapAcceptor, Netty’s InboundChannelHandler passes in the configured childXXX. To handle the client channel connection, initialize and bind the Selector to the client channel.
Next, see how DefaultChannelPipeline’s addLast method converts ChannelHandler to ChannelHandlerContext to the Pipeline’s list.
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// Check whether non-thread-safe handlers are repeatedly added to different pipelines
// @sharable annotation annotation handler is considered thread-safe and shareable
checkMultiplicity(handler);
/ / create DefaultChannelHandlerContext
newCtx = newContext(group, filterName(name, handler), handler);
// head -> newCtx -> tail
addLast0(newCtx);
/ / when the channel didn't register to the selector, build a Task on pendingHandlerCallbackHead chain list first
/ / for the channel registration to the selector, perform ChannelHandler. HandlerAdded hook
if(! registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx,true);
return this;
}
/ / when the channel is registered, asynchronous execution ChannelHandler. HandlerAdded hook
EventExecutor executor = newCtx.executor();
if(! executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor);return this; }}To synchronize, / / channel has been registered to perform ChannelHandler. HandlerAdded hook
callHandlerAdded0(newCtx);
return this;
}
Copy the code
There are a few key points here.
1. @shareable annotation
By default, ChannelHandler cannot be added to multiple pipelines as singletons. Only when the client confirms that the ChannelHandler is thread safe can the client use the @shareable annotation. To allow singleton handlers to be added to different pipelines multiple times.
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// The Sharable annotation can be added multiple times to different pipelines if the Handler is thread-safe
if(! h.isSharable() && h.added) {throw new ChannelPipelineException();
}
h.added = true; }}Copy the code
2, ChannelHandlerContext
The second is the list structure in the Pipeline, which is connected by the ChannelHandlerContext, not the ChannelHandler.
When addLast no incoming EventLoopGroup specified by default created DefaultChannelHandlerContext will not directly binding EventLoop, if you want a Handler assigned a thread executing alone, AddLast can pass in an EventLoopGroup.
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// Select an EventLoop from the EventLoopGroup to bind to the Context
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
/ /...
}
Copy the code
DefaultChannelHandlerContext inheritance AbstractChannelHandlerContext, nothing special, just realized the handler method to get with the current context ChannelHandler binding.
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@Override
public ChannelHandler handler(a) {
returnhandler; }}Copy the code
If Context is not directly bound to EventLoop, how does Netty serialize? In fact, if the Context is not bound to an EventLoop, the default is to take the EventLoop bound to channel. See AbstractChannelHandlerContext executor method returns EventLoop.
@Override
public EventExecutor executor(a) {
if (executor == null) {
return channel().eventLoop();
} else {
returnexecutor; }}Copy the code
3. HeadContext and TailContext
In ChannelPipeline, in addition to the Context added by the user through the addLast method, the Context two-way list will have a HeadContext. The HeadContext propagates the loaded event and operates on the UNSAFE instance for IO operations.
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler.ChannelInboundHandler {
private final Unsafe unsafe;
@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) { unsafe.flush(); }}Copy the code
In addition, Context has a TailContext at the end of the bidirectional list, which is mainly used to propagate events out of the stack and recycle resources.
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(ctx, msg); }}/ / AbstractChannelHandlerContext out deal with untreated MSG, release resources
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally{ ReferenceCountUtil.release(msg); }}Copy the code
4. When to trigger the channelAdded hook
The addLast method normally fires the channelAdded hook of the ChannelHandler, but during Channel initialization it does not fire the channelAdded hook until the Channel is registered with the Selector.
Therefore, the first scenario where the channelAdded hook is triggered is after the Channel registration is completed. In addLast method, a linked list of tasks will be constructed and all tasks will be executed after the Channel registration is completed.
// DefaultChannelPipeline
private PendingHandlerCallback pendingHandlerCallbackHead;
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
// Create a callback, add it to the list, and wait for the appropriate time to trigger
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while(pending.next ! =null) { pending = pending.next; } pending.next = task; }}private abstract static class PendingHandlerCallback implements Runnable {
final AbstractChannelHandlerContext ctx;
PendingHandlerCallback next;
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
abstract void execute(a);
}
Copy the code
In addition to the previous scenario, determine whether the current thread is the corresponding EventLoop thread, if so, execute the hook synchronously, otherwise submit to EventLoop to execute the hook.
3. Register Channel
The last step in the initAndRegister method is to register the initialized channel instance with a JDK selector.
final ChannelFuture initAndRegister(a) {
// ...
// 3. Register Channel
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}
Copy the code
First of all, according to the configuration to obtain corresponding EventLoopGroup for Server side is MultithreadEventLoopGroup EventLoopGroup instance.
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
Copy the code
First, the Next method selects an EventLoop in the EventLoopGroup.
The channel is then registered with the SingleThreadEventLoop#register method, which wraps the channel into a DefaultChannelPromise and calls another overloaded register method, Notice that the EventLoop itself is bound to the Promise (the secret to Netty serialization is to bind the execution thread to the instance).
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
Copy the code
Ultimately, the Unsafe Register method is called to register the ChannelPromise.
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
Copy the code
The AbstractChannel inner class AbstractUnsafe register method is called, which is the barebones implementation of Unsafe. The two most important things to do here are to bind the EventLoop to the current Channel instance and the actual registration logic for register0.
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// Duplicate registration is not allowed
if(isRegistered()) { promise.setFailure(...) ;return;
}
// Check whether EventLoop is supported by Unsafe to prevent incorrect EventLoop being passed
if(! isCompatible(eventLoop)) { promise.setFailure(...) );return;
}
[important] Bind channel and eventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// The server usually starts on the main thread, not the same thread as EventLoop, where register0 is called asynchronously
try {
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch (Throwable t) {
// ...}}}Copy the code
NioEventLoop abstract parent class, SingleThreadEventExecutor will return to that task in the queue EventLoop subsequent understanding again.
// Default immediate=true
private void execute(Runnable task, boolean immediate) {
// main ! = this.thread -> inEventLoop = false
boolean inEventLoop = inEventLoop();
// Add the task to the task queue
addTask(task);
if(! inEventLoop) {// If the current thread is not an EventLoop bound thread, try to start the EventLoop corresponding thread
startThread();
if (isShutdown()) {
// ... }}/ / wake EventLoop
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
Copy the code
Take a look at the AbstractUnsafe#register0 implementation.
private void register0(ChannelPromise promise) {
if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
}
boolean firstRegistration = neverRegistered;
// Register the Selector in the JDK
doRegister();
neverRegistered = false;
registered = true;
/ / add pendingHandlerCallbackHead task trigger the init phase, the purpose is to perform the ChannelHandler channelAdded hook
// This triggers the init method of ChannelInitializer
pipeline.invokeHandlerAddedIfNeeded();
Call promise's trySuccess method to wake up the thread blocking on the future
safeSetSuccess(promise);
// The channelRegistered event is pushed
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// If it is the first registration, the channelActive event is pushed
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// For non-first-time registration, focus on READ events on selectorbeginRead(); }}}Copy the code
First, doRegister registers a Channel with a JDK Selector. DoRegister is an abstract method implemented by AbstractNioChannel.
@Override
protected void doRegister(a) throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
return;
} catch (CancelledKeyException e) {
if(! selected) { eventLoop().selectNow(); selected =true;
} else {
throwe; }}}}Copy the code
Then, if it is registered for the first time, Pipeline# invokeHandlerAddedIfNeeded will trigger the init phase before the cache handlerAdded hook task (see 2-4 when to trigger channelAdded), The initChannel method of the configured ChannelInitializer is triggered to add the configured ChannelHandler to the ChannelPipeline.
After performing the above step, set the Promise result to success, wake up the threads blocking on the Future and fire any listeners.
Finally, the channelRegistered event is triggered. If the first registration is also triggered, the channelActive event is set to READ for non-first registration.
The channelActive event is ultimately set to READ. Here Pipeline propagates the pushed channelActive event from HeadContext.
final AbstractChannelHandlerContext head;
@Override
public final ChannelPipeline fireChannelActive(a) {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
Copy the code
After propagating the channelActive event, HeadContext uses the readIfIsAUtoRead method, which by default fires the channel’s read method directly.
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead(a) {
if(channel.config().isAutoRead()) { channel.read(); }}Copy the code
The read event is used to set the SelectionKey to read (ACCEPT).
AbstractChannel in turn propagates the read off the stack event in the Pipeline.
@Override
public Channel read(a) {
pipeline.read();
return this;
}
Copy the code
DefaultChannelPipeline passes the read event to TailContext for propagation.
@Override
public final ChannelPipeline read(a) {
tail.read();
return this;
}
Copy the code
The read event is propagated from tail to head, which calls the beginRead method on unsafe to change the concerned event to read.
// HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
Copy the code
Finally, AbstractNioChannel’s doBeginRead method is called, and for NioServerSocketChannel, instead of focusing on the READ event, we’re focusing on the ACCEPT event. Register the concerned event with the JDK’s selectionKey.
OP_ACCEPT = 1 << 4
protected final int readInterestOp;
@Override
protected void doBeginRead(a) throws Exception {
// ...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code
4. Bind ports
Go back to the AbstractBootstrap#doBind method. At this point initAndRegister may still be executing the Register method (the EventLoop thread is executing) or register may have finished executing and the doBind0 method will be called synchronously or asynchronously.
private ChannelFuture doBind(final SocketAddress localAddress) {
Create and register a Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 2. Bind ports
if (regFuture.isDone()) {
// The register method is complete, and doBind0 is synchronized
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// doBind0 is executed asynchronously
AddListener also validates isDone, which is also a synchronous operation.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); }});returnpromise; }}Copy the code
DoBind0 is serialized and still uses the EventLoop thread to perform the bind method for a Channel.
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code
Look at the outgoing link for the Bind outgoing stack event, just like the read outgoing stack event.
AbstractChannel#bind calls pipeline corresponding to channel to propagate the bind out of the stack event.
private final DefaultChannelPipeline pipeline;
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
Copy the code
DefaultChannelPipeline directly fires the bind method of TailContext.
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
Copy the code
Is actually perform abstract AbstractChannelHandlerContext bind method.
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// Find the next Handler to focus on the bind event
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
// Serialize the invokeBind method
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run(a) {
next.invokeBind(localAddress, promise);
}
}, promise, null.false);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
}
Copy the code
The bind event is propagated from TailContext all the way to HeadContext. For example, in the EchoServer case, we added two handlers. The LoggingHandler is responsible for printing logs, and the ServerBootstrapAcceptor is responsible for accessing client channels.
LoggingHandler looks at the bind event, so it gets executed.
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
// Propagate to the next Context
ctx.bind(localAddress, promise);
}
Copy the code
The LoggingHandler finally calls context. bind, and the bind ejection event propagates further down the stack. The next Handler is ServerBootstrapAcceptor, but because ServerBootstrapAcceptor is a pure InboundHandler, it will not actually be executed.
For EchoServer, the real next Handler is HeadContext. And HeadContext is the operation Unsafe for low-level IO operations.
@Override
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
Copy the code
AbstractChannel AbstractUnsafe call subclasses NioServerSocketChannel doBind method, the implementation of the JDK Channel binding port.
// AbstractChannel.AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
// Call the subclass doBind
doBind(localAddress);
if(! wasActive && isActive()) {// If initAndRegister did not trigger channelActive before, do so
invokeLater(new Runnable() {
@Override
public void run(a) { pipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }// NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); }}Copy the code
3. Start the client
The client startup process is roughly the same as the server:
- Create a Channel
- Initialize the Channel
- Registration of the Channel
- Connect to server CONNECT and propagate the connect off stack event
Focus on a few points:
- Different Channel types are created, NioSocketChannel constructors
- Channels focus on different events. The server is ACCEPT and the client is READ
- When initializing a Channel, the init method of BootStrap is used
- Connect Out-of-stack event propagation
Bootstrap#doResolveAndConnect is the entry point for client startup.
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 1. Create, initialize, register Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 2. connect
if (regFuture.isDone()) {
if(! regFuture.isSuccess()) {return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
/ /... The Listener is registered and processed asynchronously}}Copy the code
1. Create Channel
NioSocketChannel has roughly the same inheritance as NioServerSocketChannel, except for the direct parent class AbstractNioByteChannel.
public NioSocketChannel(a) {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
// 1. Create a JDK SocketChannel
private static SocketChannel newSocket(SelectorProvider provider) {
return provider.openSocketChannel();
}
// 2. Call the superclass constructor
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
Copy the code
AbstractNioByteChannel attention is READ, unlike AbstractNioMessageChannel is ACCEPT.
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
Copy the code
2. Initialize Channel
The init method of BootStrap is as follows, which is relatively simple compared to ServerBootStrap. No special Handler is added and no childGroup configuration is involved. The focus is on ChannelPipeline’s addLast method, which starts on the Server side and has already been seen.
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
// Add the configured handler to ChannelPipeline
p.addLast(config.handler());
// The superclass method sets the option of a Channel
setChannelOptions(channel, newOptionsArray(), logger);
// The parent class method sets the attr of the Channel
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
Copy the code
3. Register Channel
The process for a client to register a NioSocketChannel to a Selector is the same as the process for a server NioServerSocketChannel.
After the registration is complete, the channelAdded hook is triggered, triggering the initChannel method of the configured ChannelInitializer.
4, the connect
Bootstrap finally calls doConnect, executing the connect method of Channel, which is the ChannelOutboundInvoker that triggers the out-of-stack event.
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// serialize EventLoop
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else{ channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}); }Copy the code
NioSocketChannel’s parent class AbstractChannel’s Connect method is called directly from DefaultChannelPipeline’s Connect method. DefaultChannelPipeline is also a ChannelOutboundInvoker that triggers an out of stack event.
// AbstractChannel
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
Copy the code
Pipeline call TailContext trigger connect the stack events, TailContext AbstractChannelHandlerContext inheritance is a ChannelOutboundInvoker.
@Override
public final ChannelFuture connect( SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
Copy the code
TailContext abstract superclass AbstractChannelHandlerContext spread the connect event to the next Handler, has spread to HeadContext the ChannelOutboundHandler.
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// Find the next handler context execution that focuses on the CONNECT event
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
// ...
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}
Copy the code
The HeadContext call Unsafe performs the actual connect operation.
@Override
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
Copy the code
AbstractNioChannel. AbstractNioUnsafe# connect:
- Call SocketChannel to perform a non-blocking connect
- Resolve connection timeouts by submitting a deferred task to EventLoop
- Add a Listener to the Future to handle asynchronous callbacks after the connection completes
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
Socketchannel.connect, which returns false if it is not blocked
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// 2. Handle connection timeout by submitting a delayed task to EventLoop. Default timeout is 30 seconds
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run(a) {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if(connectPromise ! =null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 3. Add a Listener to receive a callback when a non-blocking SocketChannel connection completes
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if(connectTimeoutFuture ! =null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null; close(voidPromise()); }}}); }}Copy the code
NioSocketChannel# doConnect, perform the JDK SocketChannel the connect method, the structure of the AbstractNioChannel methods on the Server side start seen, will set up the Channel nonblocking, So the Channel focus event is set to CONNECT.
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// ...
// Execute the JDK's SocketChannel connect method
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
// If SocketChannel is not blocking, return false and set the concern event to CONNECT
if(! connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); }return connected;
}
Copy the code
Instead of looking at how the EventLoop polls Selector. Select, we’re looking at the callback after the CONNECT event. The entry is NioUnsafe#finishConnect, and the implementation is AbstractNioUnsafe#finishConnect.
public final void finishConnect(a) {
assert eventLoop(a).inEventLoop(a);
try {
boolean wasActive = isActive();
/ / 1. Calling JDK SocketChannel. FinishConnect
doFinishConnect();
// 2. Set the Promise result and trigger the ChannelActive push event
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// 3. Cancel the connection timeout detection task submitted to EventLoop
if(connectTimeoutFuture ! =null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null; }}private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if(! wasActive && active) { pipeline().fireChannelActive(); }// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
Copy the code
conclusion
- The Future is caller-oriented for retrieving asynchronous execution results; Promises are for method implementers and are used to set the outcome of task execution.
- Server startup: create Channel, initialize Channel, register Channel, bind.
- Client startup: create Channel, initialize Channel, register Channel, connect.
- After a Channel is registered with a Selector, the ChannelAdded event is triggered and the initChannel method of ChannelInitializer is executed.
- A ChannelHandler encapsulates a ChannelHandlerContext into the ChannelPipeline. A ChannelPipeline assembles a two-way linked list of ChannelHandlerContext, with HeadContext and TailContext as its head node. Out-of-stack events are propagated from TailContext to HeadContext, and in-stack events are propagated from HeadContext to TailContext.
-
The propagation mode of the out-of-stack event, for example: the read out-of-stack event triggered by the server after registering a Channel (set the concern event to ACCEPT), the server bind out-of-stack event, and the client connect out-of-stack event. The out-of-stack event is propagated ChannelOutboundInvoker->ChannelOutboundHandler->Unsafe.
The key classes involved are as follows: Channel (ChannelOutboundInvoker) ->Pipeline (ChannelOutboundInvoker) ->TailContext (ChannelOutboundInvoker) ->ChannelOutboundH Andlers – > HeadContext (ChannelOutboundHandler) – > the Unsafe.
-
ChannelActive Trigger time: After bind is completed on the server and connect is completed on the client.
-
ChannelInitializer provides the following functions: After the server NioServerSocketChannel or client NioSocketChannel is registered, the initChannel method is triggered to add the Service ChannelHandler.