Start with the bind port **bind()** method

Abstractbootstrap. Java traces all the way from the constructor

/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(String inetHost, int inetPort) {
    return bind(SocketUtils.socketAddress(inetHost, inetPort));
}

/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
    return bind(new InetSocketAddress(inetHost, inetPort));
}

/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind(SocketAddress localAddress) {
    // Verify that the group and channelFactory attributes are null
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    Create a channel asynchronously, initialize it, and register it with selector
  	// Prepare to trace the initAndRegister() method
    final ChannelFuture regFuture = initAndRegister();
    // Get the channel from the future
    final Channel channel = regFuture.channel();
    // If an exception occurs during an asynchronous operation, the future is returned directly
    if(regFuture.cause() ! =null) {
        return regFuture;
    }
    // Handle the case when the current asynchronous operation completes (the task ends normally, or an exception occurs during execution, or the task is canceled)
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        // Create a channelPromise instance
        ChannelPromise promise = channel.newPromise();
        // Continue binding
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {  // Handle the case that the current asynchronous operation has not yet completed
        // Registration future is almost always fulfilled already, but just in case it's not.
        // Pending
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // Add a listener for the future that will trigger the callback when the asynchronous operation completes
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if(cause ! =null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    // Change the promise value
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code

Initialize and register ChannelFuture

final ChannelFuture initAndRegister(a) {
    Channel channel = null;
    try {
        / / create parentChannel
        // Create a Channel with no arguments using reflection newInstance
        channel = channelFactory.newChannel();
        // After the object is created, initialize the channel
        init(channel);
    } catch (Throwable t) {
        if(channel ! =null) { // If the condition is true, the channel was created successfully, but there was a problem during initialization
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            // Close the channel forcibly
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // There is a problem with creating a channel
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // Register parentChannel(select eventLoop from group, bind to channel, create and start the thread)
    ChannelFuture regFuture = config().group().register(channel);
    if(regFuture.cause() ! =null) {
        if (channel.isRegistered()) {
            channel.close();
        } else{ channel.unsafe().closeForcibly(); }}// If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    // added to the event loop's task queue for later execution.
    // i.e. It's safe to attempt bind() or connect() now:
    // because bind() or connect() will be executed *after* the scheduled registration task is executed
    // because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}
Copy the code

The source code we are looking at is on the server side, so select ServerBootstrap.java to trace the init() initialization method

@Override
void init(Channel channel) throws Exception {
    // Get the options property in ServerBootstrap
    finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
        // This is where the options property is initialized to the channel, which is us
        ChildHandler (new ChannelInitializer
      
       () {} set the channel
      
      	// Prepare to trace
        setChannelOptions(channel, options, logger);
    }

    // Get the attrs attribute in ServerBootstrap
    finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
        // Write attr attributes to channels one by one
        for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); }}// Get the pipeline for the current channel
    ChannelPipeline p = channel.pipeline();

    // Assign all attributes in ServerBootstrap starting with child to local variables
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    finalEntry<ChannelOption<? >, Object>[] currentChildOptions;finalEntry<AttributeKey<? >, Object>[] currentChildAttrs;synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // Add a ChannelInitializer handler to the pipeline
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // Get the handler() property value of serverBootstrap and add it to the pipeline
            ChannelHandler handler = config.handler();
            if(handler ! =null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run(a) {
                    ServerBootstrapAcceptor is called the connection handler
                    pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

Set the properties of options

static void setChannelOptions(Channel channel, Map
       
        , Object> options, InternalLogger logger)
       > {
    / / traverse the options
    for(Map.Entry<ChannelOption<? >, Object> e: options.entrySet()) {// Initialize the currently traversed option to channelsetChannelOption(channel, e.getKey(), e.getValue(), logger); }}@SuppressWarnings("unchecked")
private static void setChannelOption(Channel channel, ChannelOption
        option, Object value, InternalLogger logger) {
    try {
        // Write option to channel config
        if(! channel.config().setOption((ChannelOption<Object>) option, value)) { logger.warn("Unknown channel option '{}' for channel '{}'", option, channel); }}catch (Throwable t) {
        logger.warn(
                "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t); }}static void setChannelOptions(Channel channel, Map.Entry
       
        , Object>[] options, InternalLogger logger)
       > {
    // If there are more than one, add them in a loop
    for (Map.Entry<ChannelOption<?>, Object> e: options) {
        setChannelOption(channel, e.getKey(), e.getValue(), logger);
    }
}
Copy the code

The property setting part of the trace is done, and we continue tracing back to initAndRegister()

ChannelFuture regFuture = config().group().register(channel); Track register registration details.

MultithreadEventLoopGroup.java

@Override
public ChannelFuture register(Channel channel) {
    // Next () selects an eventLoop from the group by polling
    return next().register(channel);
}
Copy the code

SingleThreadEventLoop.java

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
Copy the code

AbstractChannel.java

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // If eventLoop is null, an exception is thrown. This is because the current channel is bound to the eventLoop
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    // If the current channel is already registered, it ends
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    // If the current eventLoop is incompatible with the current channel, end the eventLoop
    if(! isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    // This is where the current channel and eventLoop are bound
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {  // Check whether the thread currently executing is the same thread that the eventLoop is bound to
        register0(promise);
    } else {
        try {
            // eventLoop is essentially an executor and its execute() is called
            eventLoop.execute(new Runnable() {
                @Override
                public void run(a) {
                    / / registerregister0(promise); }}); }catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}}private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
                }
                boolean firstRegistration = neverRegistered;
                / / register
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
                // Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code

AbstractNioChannel.java

@Override
protected void doRegister(a) throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // Complete the registration of a NIO native channel with a NIO native Selector
            // The second argument is 0, which indicates that the current channel does not care about events.
            // Two reasons:
            // 1) This is a generic method that is called by all channel registrations. All channels contain three categories:
            // 1.1 Server parentChannel, whose concern should be OP_ACCEPT, receives connection-ready events
            // 1.2 The childChannel of the server should be concerned with OP_READ or OP_WRITE, i.e. read/write ready events
            // 1.3 client channel, whose focus should be OP_CONNECT, the connection-ready event
            // 2) Specify the event to be concerned with when a Netty encapsulated channel is created
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
            return;
        } catch (CancelledKeyException e) {
            if(! selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throwe; }}}// end-for
}
Copy the code

Back in Abstractchannel.java, the trace thread creates ** eventloop.execute (new Runnable() **

SingleThreadEventExecutor.java

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    Return true if the current thread is the same thread to which the current eventLoop is bound, false otherwise
    boolean inEventLoop = inEventLoop();
    // Add the task to the task queue
    addTask(task);
    if(! inEventLoop) {// Create and start a thread
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true; }}catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if(reject) { reject(); }}}if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}


private void startThread(a) {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); }}}}private void doStartThread(a) {
        assert thread == null;
        // Call execute() of the child executor(that is, executor included in eventLoop)
        // This execute() does two things:
        // 1) Create a thread
        // 2) Start the thread
        executor.execute(new Runnable() {
            @Override
            public void run(a) {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // It calls an infinite loop for
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break; }}// Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates."); }}try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break; }}}finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if(! taskQueue.isEmpty()) {if (logger.isWarnEnabled()) {
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ') ');
                                }
                            }
                            terminationFuture.setSuccess(null); }}}}}); }Copy the code

Trace the thread creation method execute in the doStartThread() method

ThreadExecutorMap.java

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    // An executor created by an anonymous inner class
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            // Call the total executor's execute()executor.execute(apply(command, eventExecutor)); }}; }Copy the code

Continue with the execute method in execute()

ThreadPerTaskExecutor.java

@Override
public void execute(Runnable command) {
    // newThread() creates a newThread
    // start() starts the new thread by calling the command's run() method
    threadFactory.newThread(command).start();
}
Copy the code

Create the task thread and return it. At this point the thread is created

DefaultThreadFactory.java

@Override
public Thread newThread(Runnable r) {
    // create a thread
    Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
    // Initialize the thread
    try {
        if(t.isDaemon() ! = daemon) { t.setDaemon(daemon); }if (t.getPriority() != priority) {
            t.setPriority(priority);
        }
    } catch (Exception ignored) {
        // Doesn't matter even if failed to set.
    }
    return t;
}

protected Thread newThread(Runnable r, String name) {
     return new FastThreadLocalThread(threadGroup, r, name);
}
Copy the code