Address: juejin.cn/post/684490…

Said in the previous

The Netty source code used in this article is 4.1.31.final, there are some differences between different versions.

JDK Future

Before we talk about Netty’s asynchronous Future, let’s take a look at the JDK’s built-in Future mechanism.

Let’s start with a little bit of code

public class JDKFuture {

    static ExecutorService executors = new ThreadPoolExecutor(1.1.60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(16));
    public static void main(String[] args) throws Exception{
        int cnt = 1;
        Future[] jdkFuture=new Future[cnt];
        Object jdkFutureResult;
        for(int i = 0; i < cnt; i++){ jdkFuture[i] = executors.submit(new JDKCallable(i));
        }
        System.out.println(String.format("%s is about to fetch task execution result at %s", Thread.currentThread(), new Date()));
        jdkFutureResult = jdkFuture[0].get();
        System.out.println(String.format("%s in %s task results obtained %s", Thread.currentThread(), new Date(), jdkFutureResult));
        executors.shutdown();
    }

    static class JDKCallable implements Callable{

        int index;

        JDKCallable(int ind){
            this.index = ind;
        }

        public Object call(a) throws Exception {
            try {
                System.out.println(String.format("Thread [%s] submits task [%s]", Thread.currentThread(), this.index));
              // It takes 2 seconds to simulate time-consuming operation
                Thread.sleep(2000);
                System.out.println(String.format("Thread [%s] completes task [%s]", Thread.currentThread(), this.index));
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            return String.format("Task %s execution result".this.index); }}}Copy the code

The output is:

[Thread [Thread pool -1-thread-1.5,main]] Commit task [0]
Thread[main,5, the main] in Mon Dec16 16:40:38 CST 2019Thread[Thread[pool-1-thread-1.5,main]] Execute a task [0Thread[main,5, the main] in Mon Dec16 16:40:40 CST 2019The task result is obtained0The execution resultCopy the code

We can see that the main thread is waiting 2 seconds to use future.get() because the child thread has not finished processing the result. This is where the JDK’s future mechanism is not perfect. Because the JDK’s own future mechanism is not perfect, so Netty implemented a future mechanism.

Netty asynchronous Future/Promise

Netty’s Future is asynchronous, so how is it implemented? Next, start from the source code.

Take a look at Netty’s Future and Promise interfaces

Future

/** * The result of an asynchronous operation * The result of an asynchronous operation */
public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess(a);
    boolean isCancellable(a);
    Throwable cause(a);
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> sync(a) throws InterruptedException;
    Future<V> syncUninterruptibly(a);
    Future<V> await(a) throws InterruptedException;
    Future<V> awaitUninterruptibly(a);
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    V getNow(a);
  
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}
Copy the code

Promise

A Promise is a special Future that is writable, and writable means you can change the results inside.

/**
 * Special {@linkFuture} which is writable. * A writable special Future * inherits the Future, and the inherited methods are not listed */
public interface Promise<V> extends Future<V> {

    /**
     * Marks this future as a success and notifies all
     * listeners.
     * If it is success or failed already it will throw an {@linkIllegalStateException}. * Mark the future as success and notify all listeners * that exceptions will be thrown if successful or unsuccessful */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a success. Otherwise {@codeFalse} because this future is * already marked as either a success or a failure. Failure to set the above method throws an exception */
    boolean trySuccess(V result);

  	// These two are similar to the above
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable(a);
}
Copy the code

The source code interpretation

See here all the students are the default is to use netty to write procedures ~, haven’t written words can see the official document or I another netty use.

Then start the interpretation of the source code.

So where to start?

We all know it! Write (MSG), write(MSG), write(MSG), write(MSG), write(MSG).

start

Now that you know the difference between channel().write and ctx.write, let’s start with channel().write.

No, I think I'd better add something to that, otherwise I feel uncomfortable.

There is a pipeline in Netty, that is, the event call chain. During development, we add our own handle to the call chain to handle events. However, in this pipeline, Netty adds two handles, Head and tail, to facilitate the Netty framework to handle events.

Initialize DefaultChannelPipeline (); add two handles, head and tail to the initialization code. See the answers below for details

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        // ChannelInboundHandler
        this.tail = new DefaultChannelPipeline.TailContext(this);
	      // ChannelInboundHandler && ChannelOutboundHandler
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }
Copy the code

Real start

Yes, let’s start with channel().write(MSG).

Trace the code channel().write(), which first calls the writeAndFlush method of DefaultChannelPipeline.

1.DefaultChannelPipeline#writeAndFlush

    public final ChannelFuture writeAndFlush(Object msg) {
        return this.tail.writeAndFlush(msg);
    }
Copy the code

Tail.tail is the tailHandle initialized in the constructor above, while write is an out-of-stack event that is passed from tailHandle forward to headHandle.

public ChannelFuture writeAndFlush(Object msg) {
  	// There is a new promise, and the promise will always be delivered, always delivered.....
    return this.writeAndFlush(msg, this.newPromise());
}
Copy the code

Next came to AbstractChannelHandlerContext writeAndFlush.

    /** * Perform write and flush *@param msg
     * @param promise
     */
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        This method returns true only after ChannelHandler#handlerAdded is called
        if (invokeHandler()) {
            // write continues delivery
            invokeWrite0(msg, promise);
            // flush data
            invokeFlush0();
        } else{ writeAndFlush(msg, promise); }}private void write(Object msg, boolean flush, ChannelPromise promise) {
        // Look for the next OutboundHandle, because it is output
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        // The thread where the next OutboundHandle is located
        EventExecutor executor = next.executor();
        // If the threads are in the same Thread pool (since Netty channels bind only one Thread in a ThreadPool, different threads also mean different Thread pools)
        if (executor.inEventLoop()) {
            // In the same thread pool (which means the same thread),
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else{ next.invokeWrite(m, promise); }}else {
            // In different thread pools, a task needs to be created and submitted to the next thread pool
            final AbstractWriteTask task;
            if (flush) {
                // Commit to the next thread pool && flush
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            // Since it is a write event, so then submits the task to the next OutboundHandle thread for execution
            if(! safeExecute(executor, task, promise, m)) {// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                // Failed to submit the tasktask.cancel(); }}}Copy the code

2. HeadContext# write, flush

Now comes the most important part of this article, HeadContext!

The write and flush methods of HeadContext are actually implemented by calling unsafe methods.

write

	// If writeAndFlush is used, flush is called after write
	@Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // This call AbstrachUnsafe. Write
            unsafe.write(msg, promise);
        }

	// This is the unsafe write method
        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // outboundBuffer = null indicates that the channel is closed and the future result needs to be set to false
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0; }}catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            // Add MSG to buffer
            outboundBuffer.addMessage(msg, size, promise);
        }
Copy the code

flush

In the case of WriteAndFlush, the Head flush method is called after the write call, which is the same as the AbstractUnsafe flush

        /** * write the flush */
        @Override
        public final void flush(a) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            // Buffer is marked to be flush
            outboundBuffer.addFlush();
            // Then there is the real flush
            flush0();
        }
Copy the code

What is a ChannelOutboundBuffer?

The ChannelOutboundBuffer simply stores data written by the current channel and writes it out when it calls Flush.

Follow the source has been after flush0, will call to AbstractNioMessageChannel# doWrite method. (still doRead method above, is called when data received)

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            //
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                // Determine the write event
                if((interestOps & SelectionKey.OP_WRITE) ! =0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }
            try {
                // Loop out the data
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                    // Actually write out the data
                    JavaChannel ().send(nioData, mi);
                    JavaChannel ().register()
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break; }}// Write successfully to remove the data just written from buffer
                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    // Write failed
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break; }}catch (Exception e) {
                // Whether to continue to write the following data after the error
                if (continueOnWriteError()) {
                    in.remove(e);
                } else {
                    throwe; }}}}Copy the code

3.Promise

Up here, the data is written out, so what’s the relevance of promises? Don’t see that?

To be honest, it’s pretty deep down. It’s! Put it in buffer.remove()!

    public boolean remove(a) {
        // Just write out the data Entry
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

      	// writeAndFlush DefaultPromise()
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

	      // Remove from buffer
        removeEntry(e);

        if(! e.cancelled) {// only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
	          / /!!!!!! To highlight!!!!!!
						// The result of the promise is set, and trySuccess is called to notify all listeners
            / /!!!!!! To highlight!!!!!!
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false.true);
        }

        // recycle the entry
	      // Resets Entry information for reuse.
        NewInstance (MSG, size, total(MSG), promise); In contrast, newInstance is obtaining a cached Entry
        e.recycle();
        return true;
    }
Copy the code

The promise notifies all listeners that the data has been written successfully and safeSuccess(Promise) is inside when buffer.remove() is called, The trySuccess() of the Promise is eventually called to trigger notifyListeners of all listeners.

4.NotifyListener

This is invoked on Promise#trySuccess to notify all listeners that the action is complete.

    /** * Notifies listeners that the task is complete */
    private void notifyListeners(a) {
        // Get the future thread (pool)
        EventExecutor executor = executor();
        // Execute notification is the current thread directly callback information
        // currentThread == this.executor
        if (executor.inEventLoop()) {
            // Get the ThreadLocal variable
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            // The number of levels in listen
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    // Notify all listeners
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return; }}// If the executor is not the current thread, the future's owning executor will be executed
        // means that the executor that adds the notification may be the previous executor, and then the next executor (the current thread) executes the notification
        // The notification is now returned to the previous executor
        // The notification is not executed by the current thread, encapsulated as a task, and the previously submitted thread completes the notification (callback).
        safeExecute(executor, new Runnable() {
            @Override
            public void run(a) { notifyListenersNow(); }}); }Copy the code

conclusion

Netty’s Future asynchronous mechanism encapsulates the notification into a Task after the operation is complete, and the Promise thread (Executors) executes it.


The last

This content is over here, the last last, thank you very much to see here!! Your reading is an affirmation of the author!! Feel the article has the help of the officer easily point praise again walk bai (finally exposed me is to cheat praise (◒. ◒)), each of your likes is very important to the author (incredibly true) and a double to the author’s writing!!