Netty source code interpretation

EventLoop source

1 Instantiate the NioEventLoopGroup

We discarded the service code directly using a no-parameter constructor instantiated, so let’s start from the constructor look, see io.net ty. Channel. Nio. NioEventLoopGroup# NioEventLoopGroup ()

    public NioEventLoopGroup(a) { 
        this(0);
    }

    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
        // Look at the place where you create a SelectorProvider and you're thinking of a Selector in Java NIO
        this(nThreads, executor, SelectorProvider.provider());
    }
    
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
Copy the code

Go to super and see what else you did. See: io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object…)

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // Look at this place, if you use the no-parameter constructor this place will pass in 0, but the final number of threads is DEFAULT_EVENT_LOOP_THREADS
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
Copy the code

You can see if the default number of threads is DEFAULT_EVENT_LOOP_THREADS, what is this number? Click on the definition to see that it is 2 times the number of computer cores: Runtime.getruntime ().availableProcessors() * 2. Continue to follow up in super, See io.net ty. Util. Concurrent. MultithreadEventExecutorGroup# MultithreadEventExecutorGroup (int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        
        // Instantiate executor
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        / / EventExecutor array
        children = new EventExecutor[nThreads];
        
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // Instantiate the members of the array
                children[i] = newChild(executor, args);
                success = true; }... }// Create a selector for the evetLoop. If there are multiple Eventloops, the Channel selects which eventLoop thread to run on.
        chooser = chooserFactory.newChooser(children);
    }

Copy the code

We continue to follow up the newChild method, see, io.net ty. Channel. Nio. NioEventLoopGroup# newChild

@Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        // This is where an eventLoop is instantiated. Let's follow up with the constructor
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
Copy the code

Follow up the constructor, see, io.net ty. Channel. Nio. NioEventLoop# NioEventLoop

 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // We will not follow up with the super, you can think of the executor as a member variable for the eventLoop to start a thread later
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        // If we look at this, we can see that the Java NIO selector is created here and becomes a member variable of the eventLoop
        selector = openSelector();
        selectStrategy = strategy;
}
Copy the code

Create an Array of EventLoops twice the size of the number of computer cores and instantiate each eventLoop. Essentially, create the member variables of the eventLoop: Executor and Selector

2 Execution logic of eventLoop

Remember we did in the above section will be in after creating the ServerSocketChannel will be registered on one of the eventLoop, see io.net ty. Channel. AbstractChannel. AbstractUnsafe# register

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {... AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    // At this point the eventLoop starts executing and we follow it
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run(a) { register0(promise); }}); }catch(Throwable t) { ... }}}Copy the code

We follow the execute method, see io.net ty. Util. Concurrent. SingleThreadEventExecutor# execute

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            // Look at this, since we are currently in the main thread, execute this method to start a new thread
            startThread();
            addTask(task);
            if(isShutdown() && removeTask(task)) { reject(); }}if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
    
    // This method is called
    private void startThread(a) {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                // Follow updoStartThread(); }}}// Adjust here
    private void doStartThread(a) {
        assert thread == null;
        // Finally, a new thread is started with this ThreadPerTaskExecutor
        executor.execute(new Runnable() {
           executor.execute(new Runnable() {
            @Override
            public void run(a) {
                // Save the current thread in the current eventLoop thread
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // What's the matter with this place
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally{... }}}); }Copy the code

We continue to follow up the SingleThreadEventExecutor. This. The run (); See io.net ty. Channel. Nio. NioEventLoop# run

    @Override
    protected void run(a) {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        // (1) This select looks familiar. Think back to Java NIO
                        select(wakenUp.getAndSet(false)); .if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // (2) Does this look familiar?
                        processSelectedKeys();
                    } finally {
                        // (3) What do we do hererunAllTasks(); }}... }}Copy the code

(1) to select

We follow this approach see io.net ty. Channel. Nio. NioEventLoop# select

private void select(boolean oldWakenUp) throws IOException {
        // Get the Java NIO selector
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            // The for loop keeps polling to see if it has time to happen
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                
                if (hasTasks() && wakenUp.compareAndSet(false.true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                intselectedKeys = selector.select(timeoutMillis); . }}}Copy the code

So in the code above we’re just going to focus on whether Netty is actually calling the Java NIO selector to start polling at some point in time. The reason why it is written more complex is to avoid a well-known empty polling bug, that we do not discuss here, can be set up a separate topic to talk about later.

(2) processSelectedKeys ()

Once happened we can through processSelectedKeys () event handling, follow up see: io.net ty. Channel. Nio. NioEventLoop# processSelectedKeys

    private void processSelectedKeys(a) {
        // Some of you might wonder why the selectedKeys are not selected by Neety alone, but Netty has made some optimisationsfor this selector's selectedKeys, and we're not going to go into that, but you can make a separate topic and say, Let's do the process first.
        if(selectedKeys ! =null) {
            // Execute here
            processSelectedKeysOptimized(selectedKeys.flip());
        } else{ processSelectedKeysPlain(selector.selectedKeys()); }}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        // This place is the same as Java NIO in order to handle events that occur
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                // since we created NioServerSocketChannel we are mainly looking at this to handle events
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }... }}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // Get the unsafe member variable in the current AbstractNioChannel
        finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); .try {
        
            // The following code is familiar again? Java NIO code for handling various events
            int readyOps = k.readyOps();
            
            if((readyOps & SelectionKey.OP_CONNECT) ! =0) {              
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }

           
            if((readyOps & SelectionKey.OP_WRITE) ! =0) {         
                ch.unsafe().forceFlush();
            }

           
            if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) {
                // Do we remember what we did in Java NIO? Create SocketChannel and register read events. We'll discuss this in the next chapter, how does Netty handle new connection access
                unsafe.read();
                if(! ch.isOpen()) {return; }}}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

So now that the implementation of eventLoop is over, let’s review the previous chapter and combine it with Java NIO and think about what we did. So we create a NioServerSocketChannel, and then we turn on the selector polling event, and then we register the OP_ACCEPT event on the selector and that’s the same process that we wrote in Java NIO, right? Doesn’t that make it easier to remember?