preface

The principle of Netty framework is the multithreading model based on multiple reactors in Reactor model. This article mainly introduces several important concepts of Netty and uses the reference articles for reference

ChannelFuture

Let’s take a look at some of the more important interfaces in Netty

Public interface Future < V > extends Java. Util. Concurrent. The Future < V > {/ / I/O operation is successful, successful return true Boolean isSuccess (); Boolean isCancellable(); // Throw I/O operation failure Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> sync() throws InterruptedException; // Wait for the Future task to complete. Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; // Wait for the Future task to complete. Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); V getNow(); // Get the result of the Future task immediately. Null is returned if the Future task is not completed. }Copy the code

Netty’s Future interface inherits from JDK1.5’s Future interface. Why duplicate wheels when there is already a Future interface?

This is because the Future interface in JDK1.5 does not meet Netty’s requirements. The JDK’s Future interface can retrieve the results of asynchronous computations and provide various methods to check whether a task has been canceled or completed. However, the user cannot know when the method has completed. When to call the get() method to get the result, but do not loop isDone() method, it is too expensive CPU resources. Netty’s Future interface makes up for this defect to some extent. By adding new listeners, you can know whether the task is completed and what to do after the task is completed. IsSuccess can know whether the task is successful

Public interface ChannelFuture extends Future<Void> {// Returns the Channel associated with ChannelFuture. @Override ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener); @Override ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners); @Override ChannelFuture sync() throws InterruptedException; @Override ChannelFuture syncUninterruptibly(); @Override ChannelFuture await() throws InterruptedException; @Override ChannelFuture awaitUninterruptibly(); }Copy the code

ChannelFuture inherits Netty’s Future interface. All I/O operations in Netty are asynchronous, so when the method returns, it does not indicate that the I/O operation has been completed. Therefore, ChannelFuture encapsulates the result of the asynchronous I/O operation. The interface definition method is similar to Netty’s Future interface. What’s new is that ChannelFuture is associated with a Channel

public interface GenericFutureListener<F extends Future<? >> extends EventListener {// This method is called when the Future task associated with this operation completes. Void operationComplete(F Future) throws Exception; }Copy the code

Callback methods for Listeners are defined in the GenericFutureListener interface, and methods in this class are called back when the Future task completes

Public Interface Promise<V> extends Future<V> {public interface Promise<V> extends Future<V> {public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V result); Promise<V> setSuccess(V result); False Boolean trySuccess(V result); false Boolean trySuccess(V result); // Mark the Future task as a failure and notify all listeners // If the operation fails, Promise<V> setFailure(Throwable cause); Promise<V> setFailure(Throwable cause); False Boolean tryFailure(Throwable cause); // Mark the Future task as uncancellable Boolean setUncancellable(); @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); @Override Promise<V> await() throws InterruptedException; @Override Promise<V> awaitUninterruptibly(); @Override Promise<V> sync() throws InterruptedException; @Override Promise<V> syncUninterruptibly(); }Copy the code

Promise also inherits Netty’s Future interface. Since there is no write interface in Netty’s Future interface, Netty extends Promise to set the outcome of I/O operations. The setSuccess() and setFailure() methods in the interface are called after the task completes, and then the methods in the Listener are called back. After these operations, the thread of await() or sync() is returned from the wait.

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
​
    @Override
    Channel channel();
​
    @Override
    ChannelPromise setSuccess(Void result);
​
    ChannelPromise setSuccess();
​
    boolean trySuccess();
​
    @Override
    ChannelPromise setFailure(Throwable cause);
​
    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
​
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
​
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
​
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
​
    @Override
    ChannelPromise sync() throws InterruptedException;
​
    @Override
    ChannelPromise syncUninterruptibly();
​
    @Override
    ChannelPromise await() throws InterruptedException;
​
    @Override
    ChannelPromise awaitUninterruptibly();
​
    ChannelPromise unvoid();
}
​
Copy the code

The ChannelPromise interface inherits ChannelFuture and Promise features from both sides. The methods in the interface are very similar to the previous interface, except that the return value is ChannelPromise

After looking at the above interfaces, let’s look at the implementation of these interfaces in Netty

If you look at this class diagram, you can see that DefaultPromise implements Promise, DefaultChannelPromise implements ChannelPromise and inherits DefaultPromise, DefaultPromise does not implement ChannelFuture, so there are no channelFuture-related features, so to see the implementation of the above interface in Netty, you should look at DefaultChannelPromise

public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; private long checkpoint; /** * Creates a new instance. * * @param channel * the {@link Channel} associated with this future */ public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = checkNotNull(channel, "channel"); } @Override public ChannelPromise setSuccess(Void result) { super.setSuccess(result); return this; } @Override public ChannelPromise setFailure(Throwable cause) { super.setFailure(cause); return this; } @Override public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { super.addListener(listener); return this; }}Copy the code

If you look at the code for DefaultChannelPromise, you can see that many methods call methods in the parent class of DefaultPromise, so let’s shift the battlefield and look at the code for DefaultPromise

@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this); } @Override public boolean trySuccess(V result) { return setSuccess0(result); } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, ObjResult)) {if (checkNotifyWaiters()) {// Listeners notifyListeners(); } return true; } return false; }Copy the code

Notice that setSuccess steps are set values and wake up all Listeners. If this fails, an exception will be thrown. TrySuccess does the same thing, but no exceptions will be thrown

ChannelPipeline

ChannelPipeline itself is a container object associated with a Channel. This container stores multiple ChannelHandlerContext. The ChannelHandlerContext stores the ChannelHandler object we wrote. Multiple ChannelHandlerContexts are connected in series using lists, and I/O events pass sequentially through each ChannelHandler in the ChannelPipeline

As shown above

Netty events are classified into Inbound events and Outbound events. Inbound events are triggered by THE I/O thread, such as TCP link establishment events and read events. Outbound events are I/O events initiated by users, such as connection events and read events

p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
Copy the code

A class that starts with Inbound means that it is an Inbound Handler, and a class that starts with Outbound means that it is an Outbound Handler. Let’s guess the order in which these ChannelHandlers are executed.

3, 4 do not implement ChannelnboundHandler. 1, 2 do not implement ChannelOutboundHandler. 5 implement both ChannelnboundHandler and ChannelOutboundHandler. According to the rule that Inbound events are executed first and Outbound events are executed in the sequence of 1->2->5->3->4->5.

In fact, no, Inbound events are executed from front to back, and Outbound events are executed from back to front, so the execution sequence is 1->2->5->5->4->3

When ChannelPipeline was created

Channel pipeline and Channel are matched one by one, so when a Channel is created, the Channel pipeline is also created

protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); Pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() {create DefaultChannelPipeline return new DefaultChannelPipeline(this); }Copy the code
// Create two linked list nodes Protected DefaultChannelPipeline(Channel Channel) {this. Channel = objectutil.checknotnull (Channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); / / observation TailContext, can find TailContext inherited AbstractChannelHandlerContext / / showed that the node is ChannelHandlerContext ChannelPipeline, Not ChannelHandler tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }Copy the code

NioEventLoopGroup

NioEventLoopGroup is a thread pool, and NioEventLoop is a thread created by NioEventLoopGroup

public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) {this(nThreads, (executor) null); } public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }Copy the code

The constructor code chases along until it finds a way to do its job

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { checkPositive(nThreads, "nThreads"); // Because the constructor is assigned null, ThreadPerTaskExecutor() if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; Children [I] = newChild(executor, args); children[I] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally {// New thread failed, thread gracefully closed if (! success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (! e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; }}}}} / / to choose the appropriate training in rotation mechanism chooser. = chooserFactory newChooser (children); // Create a new listener, TerminationListener = new FutureListener<Object>() {@override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); }}}; For (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }Copy the code
// The number of executors is 2 and the number of non-2 * * * * * is different @Override public EventExecutorChooser (EventExecutor[] execser) {// Judge executors is a power of 2 to the power if (isPowerOfTwo (executors. Length)) {return new PowerOfTwoEventExecutorChooser (executors); } else { return new GenericEventExecutorChooser(executors); }}Copy the code

NioEventLoopGroup is a thread pool, and NioEventLoop is a thread pool. The newChild() method creates a NioEventLoop

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); // You can see that selector is bound to NioEventLoop this.selector = selectorTuple. Selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; } protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); // A task queue is assigned to a task queue. // NioEventLoop executes I/O tasks partly and non-I /O tasks partly. This. taskQueue = objectutil. checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }Copy the code

So at this point, the NioEventLoop is associated with the selector

To be continued…..

The resources

Juejin. Cn/post / 684490…