“This is the 29th day of my participation in the August Gwen Challenge

1. Initialize the server socket

1.1 Creating a Server Channel

When Netty creates a Channel, it encapsulates the Jdk Channel as its own Channel. So start exploring how Netty created Jdk channels. So let’s start with a common piece of code:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
            .handler(new ServerHandler())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    // ch.pipeline().addLast(new ServerHandler());
                    / /..}}); ChannelFuture f = b.bind(8888).sync();

    f.channel().closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}
Copy the code

Netty encapsulates its own channel

  • Entry method: bing(int port)

  • InitAndRegister () : initializes and registers

  • Channel = channelFactory. NewChannel () : call the Jdk reflection to create a server channel. The channel (NioServerSocketChannel. Class) created a ReflectiveChannelFactory by passing in the class called reflection to create a channel, This Channel is NioServerSocketChannel.

NioServerSocketChannel constructor:

public NioServerSocketChannel(a) {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
Copy the code
  • NewSocket () : Returns the Jdk ServerSocketChannel.
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code
  • NioServerSocketChannelConfig () : the reflection created the Jdk TCP Channel configuration parameters. Super above traces all the way to the parent class.
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // Non-blocking mode
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2); }}throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code
  • Call AbstractNioChannel() ===> configureBlocking(false) : non-blocking mode.
  • Super (parent) calls the parent class AbstractChannel() : creates id, unsafe, pipeline. Three properties of a Channel in Netty
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
Copy the code

1.2 Initializing a Server Channel

NewChannel () : When the server is created, it is initialized.

final ChannelFuture initAndRegister(a) {
    Channel channel = null;
    try {
        / / create the Channel
        channel = channelFactory.newChannel();
        // Initialize the Channel
        init(channel);
    } catch (Throwable t) {
        if(channel ! =null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
	/ / register the selector
    ChannelFuture regFuture = config().group().register(channel);
    if(regFuture.cause() ! =null) {
        if (channel.isRegistered()) {
            channel.close();
        } else{ channel.unsafe().closeForcibly(); }}// If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    // added to the event loop's task queue for later execution.
    // i.e. It's safe to attempt bind() or connect() now:
    // because bind() or connect() will be executed *after* the scheduled registration task is executed
    // because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}
Copy the code
  • Initialization entry: init(channel)

  • Set ChannelOptions, ChannelAttrs Sets the properties of the connection

  • Set ChildOptions, ChildAttrs Sets the properties of the connection

  • Handler ===>.handler(new ServerHandler()))

  • Add ServerBootstrapAcceptor Adds the connector

void init(Channel channel) throws Exception {
    / / set ChannelOptions
    finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
        channel.config().setOptions(options);
    }
	/ / set ChannelAttrs
    finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
        for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); }}// Get the pipeline for the server Channel. When a server Channel is created, a pipeline is created
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    finalEntry<ChannelOption<? >, Object>[] currentChildOptions;finalEntry<AttributeKey<? >, Object>[] currentChildAttrs;// Save ChildOptions set by the user
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    // ChildAttrs set when the user is saved
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
	// 
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // Add a user-defined configured handler to the pipeline's processing chain. The handler here is configured in the user code
            ChannelHandler handler = config.handler();
            if(handler ! =null) {
                pipeline.addLast(handler);
            }
			
            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run(a) {
                    // Add connector, a special Handler. Pass in childHandler, childOptions, and ChildAttrs
                    // This is where the user-configured childHandler is configured. So every time a new connection is made, childHandler is triggered
                    pipeline.addLast(newServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code

In general, initializing a Channel is a simple matter of integrating user-configured handlers, Options, and Attrs into the pipeline.

1.3 registered the selector

The initAndRegister() method is only used for init initialization. Selector ===> ‘ChannelFuture regFuture = config().group().register(channel); The final call is AbstractChannel’s Register method.

Steps:

  • Entry: AbstractChannel. The register ()
  • Binding thread: AbstractChannel. Enclosing eventLoop = eventLoop
  • The method that actually performs the registration: register0()
    • Call the underlying Jdk for selector registration: doRegister()
    • invokeHandlerAddedIfNeeded()
    • Broadcast events: fireChannelRegistered()
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // Omit some if judgment code
    
	// Bind threads
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        // Actual registration
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run(a) {
                    // Actual registrationregister0(promise); }}); }catch (Throwable t) {
            // Omit some exception handling}}}private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
        }
        boolean firstRegistration = neverRegistered;
        // Call the underlying Jdk registry selector
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        // Make sure the handlerAdded() method in ChannelHandler is executed
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // Make sure the handlerAdded() method in ChannelHandler is executed
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // The Channel is not yet bound, so it is false
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
        // Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code

When we implemented our custom ChannelHandler, we rewrote the following: HandlerAdded (ChannelHandlerContext CTX), channelRegistered(ChannelHandlerContext CTX) and channelActive(ChannelHandlerContext CTX) CTX), the three methods will be called back.

pipeline.invokeHandlerAddedIfNeeded(); =====> handlerAdded(ChannelHandlerContext ctx)

pipeline.fireChannelRegistered(); =====> channelRegistered(ChannelHandlerContext ctx)

pipeline.fireChannelActive(); ChannelActive (ChannelHandlerContext CTX) is not triggered because isActive() returns false. The triggering of the callback action occurs in the port binding below

An implementation of the doRedister() method, which uses Nio, enters AbstractNioChannel:

@Override
protected void doRegister(a) throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // Finally call the underlying SELECTOR in the Jdk to register. JavaChannel () was mentioned earlier, and Netty channels are further encapsulated by channels created in the Jdk.
            selectionKey = javaChannel().register(eventLoop().selector, 0.this);
            return;
        } catch (CancelledKeyException e) {
            if(! selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throwe; }}}}Copy the code

1.4 Port Binding

Steps:

  • Entry: AbstractUnsafe. The bind ()
    • doBing()
      • Call the Jdk’s underlying port binding: javaChannel().bing()
    • Transmission events: pipeline. FireChannelActive ()
      • The binding rebind to accept events: before HeadContext. ReadIfIsAutoRead ()

AbstractChannel:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceofInetSocketAddress && ! ((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && ! PlatformDependent.isWindows() && ! PlatformDependent.isRoot()) {// Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }
	// Whether the binding is successful
    boolean wasActive = isActive();
    try {
        / / port binding, NioServerSocketChannel. DoBind (...).
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
	
    if(! wasActive && isActive()) { invokeLater(new Runnable() {
            @Override
            public void run(a) {
                // Propagate active eventspipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }Copy the code

IsActive () method with the code, after the completion port binding, will trigger the active event, this time to go to spread: pipeline. FireChannelActive ().

@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}
Copy the code

Using Nio so doBind call NioServerSocketChannel. DoBind (…).

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else{ javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code

Pipeline. FireChannelActive () time in Netty spread, simple talk about here, have a chance to detail behind!

DefaultChannelPipeline:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // Propagate active events
    ctx.fireChannelActive();
	// Trigger the read event for a channel
    readIfIsAutoRead();
}
Copy the code

Eventually read event is invoked to AbstractNioChannel. DoBeginRead ()

@Override
protected void doBeginRead(a) throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    SelectionKey = javaChannel().register(eventLoop().selector, 0, this);
    / / the interestOps = 0
    final SelectionKey selectionKey = this.selectionKey;
    if(! selectionKey.isValid()) {return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {  / / 0, 0 = 0
        // The ops of selectionKey is 0 and the operation of 0 is OP_READselectionKey.interestOps(interestOps | readInterestOp); }}Copy the code

Second, the summary

NewChannel (create Channel) ===> init() ===> diBind() ===> diBind() ===> diBind() ===> diBind() ===> diBind()