Register operation of Channel

After the previous foreshadowing, we have a certain foundation, let’s start to knead together what we have learned before. In this section, we will introduce the register operation, which is actually a very critical step for our source code analysis.

register

We start with the connect method in EchoClient, or the bind(port) method in EchoServer, and go to initAndRegister:

final ChannelFuture initAndRegister {Channel channel = ; try {// 1channel = channelFactory.newChannel; // 2 For Bootstrap and ServerBootstrap, there are some differences init(channel); } catch (Throwable t) {... } ChannelFuture regFuture = config.group.register(channel); if (regFuture.cause ! = ) {if (channel.isRegistered) {channel.close; } else {channel.unsafe.closeForcibly; }}return regFuture; }Copy the code

We have been exposed to the method of initAndRegister twice. Previously, we introduced the instantiation of 1️ Channel. During the instantiation process, Channel’s internal Unsafe and Pipeline will be instantiated. In the above method 2️ one (channel), a handler (head+ Channelnitializer +tail) will be added to pipeline.

This section finally reveals the initChannel method in ChannelInitializer

Now, let’s go further and take a look at this step of 3️ discount register:

ChannelFuture regFuture = config.group.register(channel);
Copy the code

The register step is very important. It happens after a channel is instantiated.

Instantiated the underlying JDK Channel, set it to non-blocking, instantiated Unsafe, instantiated Pipeline, and added head, tail, and a ChannelInitializer instance to Pipeline.

The above config.group method returns the instance of the previously instantiated NioEventLoopGroup and calls its register(channel) method:

// MultithreadEventLoopGroup

@Overridepublic ChannelFutureregister(Channel channel) {return next.register(channel); }Copy the code

The next method is as simple as selecting a thread from the thread pool (remember chooserFactory), which means selecting an instance of NioEventLoop, at which point we enter NioEventLoop.

NioEventLoop’s register(Channel) method is implemented in its parent SingleThreadEventLoop:

@Overridepublic ChannelFutureregister(Channel channel) {return register(new DefaultChannelPromise(channel, this)); }Copy the code

The above code instantiates a Promise with the current channel:

@Overridepublic ChannelFutureregister(final ChannelPromise promise) {ObjectUtil.checkNot(promise, "promise"); / / promise associated channel, channel to hold the Unsafe instance, register operation is encapsulated in the Unsafe promise. Channel. The Unsafe. Register (this, promise); return promise; }Copy the code

Retrieving the Unsafe instance associated with the channel, and calling its register method:

Unsafe, as we said, encapsulates low-level implementations, but it’s not that low-level.

// AbstractChannel#AbstractUnsafe

@Overridepublic final voidregister(EventLoop eventLoop, final ChannelPromise promise) {... // Set the eventLoop instance to this channel, which has an eventLoop. Should be submitted to the eventLoop to perform AbstractChannel. Enclosing eventLoop = eventLoop; // If the thread that initiates the register action is the thread in the eventLoop instance, then call register0(Promise) directly. If (eventLoop. InEventLoop) {register0(promise); } else {try {// otherwise, submit the task to eventLoop, The thread in eventLoop is responsible for calling register0(Promise) eventloop. execute(new Runnable {@overridePublic voidRun {register0(Promise); }}); } catch (Throwable t) {... }}}Copy the code

At this point, we need to understand that Thread instances are not yet instantiated in NioEventLoop.

Addressing issues like NioEventLoop, Promise, Channel, Unsafe, and more, broadening broadening is important.

As for the register operation we mentioned earlier, it actually returns the Promise instance directly after submitting to the eventLoop. The rest of the register0 operation is asynchronous, which is done by the NioEventLoop instance.

Instead of looking at the register0(Promise) method, we’ll cover the threads in the NioEventLoop we owe, and then come back to the register0 method.

Channel Instance Once a NioEventLoop instance of the NioEventLoopGroup is registered, all subsequent operations on the Channel are performed by the NioEventLoop instance.

This is also very simple, because the Selector instance is in the NioEventLoop instance, and once a Channel instance is registered with a Selector instance, it can only handle NIO events in that instance.

NioEventLoop workflow

Earlier, when we analyzed thread pool instantiation, we said that Java threads are not started in NioEventLoop. Here we come under the careful analysis on the register call eventLoop. In the process of the execute (runnable) this method, the code in the superclass SingleThreadEventExecutor:

@Overridepublic voidexecute(Runnable task) {if (task == ) {throw new PointerException("task"); } Boolean inEventLoop = inEventLoop; If the taskQueue is full (the default size is 16), the default strategy is to throw the exception addTask(task); if (! InEventLoop) {// If the task is not submitted by a thread within the NioEventLoop, check whether the thread is started. If not, start the thread startThread; if (isShutdown && removeTask(task)) {reject; }}if (! addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop); }}Copy the code

The original method for starting a thread in the NioEventLoop is here.

In addition, the register operation we talked about in the previous section goes into taskQueue, so it is categorized as a non-IO operation.

Here is the source code for startThread, which determines whether the thread has been started to determine whether to start:

private void startThread {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {doStartThread; } catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED); PlatformDependent.throwException(cause); }}}}Copy the code

Let’s look at the doStartThread method if the thread is not started:

private void doStartThread {assert thread == ; ThreadPerTaskExecutor: NioEventLoop: NioEventLoop: NioEventLoop: NioEventLoop It is an executor that creates a thread, one task at a time. Execute (new Runnable {@overridePublic voidRun {// Look at this, Set the thread created in 'Executor' to the thread of NioEventLoop!! thread = Thread.currentThread; if (interrupted) {thread.interrupt; }boolean success = false; updateLastExecutionTime; Try {/ / perform SingleThreadEventExecutor run method, it realized in NioEventLoop SingleThreadEventExecutor. Enclosing the run; success = true; } catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t); } finally {// ... We'll just ignore the code here}}}); }Copy the code

Once the thread is started, it executes the run method in NioEventLoop, which is a very important method. This method is definitely not easy to terminate, and it must be like the Worker in the JDK thread pool, which constantly loops for new tasks. It needs to continually perform select operations and poll the taskQueue.

Let’s take a brief look at its source code, but not in depth here:

@overrideprotected voidrun {// Code nested in the for loop for (;;) {try {// selectStrategy finally comes in useful // It has two values, one is CONTINUE and one is SELECT// For this code, let's analyze it. // 1\. If taskQueue is not empty and hasTasks returns true, the selectNow method will not block. If hasTasks returns false, execute the selectStrategy. SELECT branch, // SELECT (...) This is blocked // This is easy to understand, Is according to whether there is a task in the queue to decide whether to block the switch (selectStrategy. CalculateStrategy (selectNowSupplier, hasTasks)) {case SelectStrategy.CONTINUE:continue; Case selectStrategy. SELECT:// if! Select (wakenup.getandSet (false)); if (wakenUp.get) {selector.wakeup; }default:}cancelledKeys = 0; needsToSelectAgain = false; // By default, ioRatio is 50final int ioRatio = this.ioratio; If (ioRatio == 100) {// If (ioRatio == 100) {// If (ioRatio == 100) {// If (ioRatio == 100) {// If (ioRatio == 100) {// If (ioRatio == 100) { Perform I/O operations. After the previous select, there may be some channels that need to be handled. processSelectedKeys; } finally {// 2\. Execute non-IO tasks, i.e. tasks in taskQueue runAllTasks; }} else {// If ioRatio is not 100, limit the time of non-I/O operations according to the TIME of I/O operations final Long ioStartTime = system.nanotime; ProcessSelectedKeys; processSelectedKeys; } finally {// Final long ioTime = system.nanotime - ioStartTime; // Final long ioTime = system.nanotime - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }}} catch (Throwable t) {handleLoopException(t); }// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown) {closeAll; if (confirmShutdown) {return; }}} catch (Throwable t) {handleLoopException(t); }}}Copy the code

The above code is the core of NioEventLoop. Here are two things:

  1. SelectNow and Select (oldWakenUp) will be used to determine whether to perform selectNow or select(oldWakenUp) according to the results of hasTasks. If a task is waiting, then non-blocking selectNow should be used, and if no task is waiting, then a select operation with blocking can be used.

  2. IoRatio Controls the time occupied by I/O operations:

  • If set to 100%, IO operations are performed before tasks in the task queue are executed.

  • If it is not 100%, then the IO operation is performed first and then the tasks in taskQueue are executed, but you need to control the total time of the task execution. In other words, the time occupied by non-I/O operations can be calculated by ioRatio and the TIME consumed by the I/O operations.

Let’s not worry about select(oldWakenUp), the processSelectedKeys method, and runAllTasks(…) for now. The details of the methods are as simple as understanding what they do.

This is the first task NioEventLoop receives, so Thread will be instantiated and started. Then go to the run method in the NioEventLoop.

Continue to register

Going back to the register0(Promise) method, we know that the register task goes into the taskQueue of the NioEventLoop and starts the thread in the NioEventLoop, The thread polls the taskQueue and then executes the Register task.

Note that the thread in the eventLoop executes this method:

// AbstractChannel

private void register0(ChannelPromise promise) {try {... boolean firstRegistration = neverRegistered; // *** performs underlying JDK operations: Channel registers with Selector ***doRegister; neverRegistered = false; registered = true; // This step is critical because it involves ChannelInitializer init(channel)// As we said earlier, The init method will add ChannelInitializer internal handlers are added to the pipeline pipeline. InvokeHandlerAddedIfNeeded; // Since the current register method is executed in a thread in the eventLoop, the thread that submitted the register operation needs to be notified of safeSetSuccess(PROMISE); // The current register operation was successful, and the event should be sensed by all the handlers on pipeline that care about the register event, To throw an event pipeline. The pipeline in fireChannelRegistered; If (isActive) {// If the channel is registered for the first time, Then the fire ChannelActive events if (firstRegistration) {pipeline. FireChannelActive; } else if (config.isAutoRead) {// This channel is already registered, // this channel is immediately listening for the OP_READ event beginRead; }}} catch (Throwable t) {... }}Copy the code

Let’s talk about the above doRegister method and then pipeline.

@Overrideprotected voiddoRegister throws Exception {boolean selected = false; for (;;) Public final SelectionKey register(Selector sel, int ops, Object att) {// Public final SelectionKey register(Selector sel, int ops, Object att) {// Public final SelectionKey register(Selector sel, int ops, Object att) { }selectionKey = javaChannel.register(eventLoop.unwrappedSelector, , 0 this); return; } catch (CancelledKeyException e) {... }}}Copy the code

You can see that the underlying JDK register is done to register SocketChannel(or ServerSocketChannel) with the Selector, and you can see that the listening set is set to 0, that is, nothing is listened on.

Of course, that means that somewhere down the road you’re going to need to change the listening set for this selectionKey, otherwise nothing will work

Let’s focus on pipeline operations. As we introduced NioSocketChannel’s pipeline, our pipeline now looks like this:

Now we’ll see that LoggingHandler and EchoClientHandler are added to the pipeline.

We continue to look at the code. After the register is successful, we do the following:

pipeline.invokeHandlerAddedIfNeeded;
Copy the code

This step executes to the handlerAdded method of the ChannelInitializer instance on the pipeline, where its init(context) method is executed:

@Overridepublic voidhandlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel.isRegistered) {initChannel(ctx); }}Copy the code

Then we look at initChannel(CTX), where we finally get the init(channel) method we introduced earlier:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.putIfAbsent(ctx, Boolean.TRUE) == ) { // Guard against re-entrance.try {// 1\. We will add our custom handlers to the pipeline initChannel((C) ctx.channel); } catch (Throwable cause) {... } finally {// 2\. Remove ChannelInitializer instance from pipeline remove(CTX); }return true; }return false; }Copy the code

As we said earlier, when the ChannelInitializer init(Channel) is executed, its handlers are added to the pipeline, The finally block above removes the ChannelInitializer instance from the pipeline, and the pipeline is set up as follows:

There is also a question: what if we added an instance of ChannelInitializer to ChannelInitializer? So you can think about this case.

Once the pipeline is set up, then as we move down, we will execute this sentence:

pipeline.fireChannelRegistered;
Copy the code

We just need to find out about fireChannelRegistered methods, and we’ll know what happens when we run into other methods like fireChannelActive, fireXxx, etc., which are all similar. Let’s see what happens with this code:

// DefaultChannelPipeline

@ Overridepublic final ChannelPipelinefireChannelRegistered {/ / attention of the cords is here headAbstractChannelHandlerContext.invokeChannelRegistered(head); return this; }Copy the code

That is, we throw a channelRegistered event into the pipeline, where register is an Inbound event, The next thing the pipeline does is execute the channelRegistered method in the Handlers of the Inbound type in the pipeline.

From the above code, we can see that after the channelRegistered event is thrown into the pipeline, the first handler to be processed is the head.

Next, we continue to follow the code, at this point we come to the pipeline’s first node head processing:

// AbstractChanne_lH_andlerContext

/ / here next is headstatic voidinvokeChannelRegistered (final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor; / / executive head invokeChannelRegisteredif (executor. InEventLoop) {next. InvokeChannelRegistered; } else {executor.execute(new Runnable {@Overridepublic voidrun {next.invokeChannelRegistered; }}); }}Copy the code

. That is to say, there will be executed the head invokeChannelRegistered method, and it is performed on the taskQueue a NioEventLoop:

// AbstractChanne_lH_andlerContext

Private void invokeChannelRegistered {if (invokeHandler) {try {// Handler method returns head((ChannelInboundHandler) handler).channelRegistered(this); } catch (Throwable t) {notifyHandlerException(t); }} else {fireChannelRegistered; }}Copy the code

Let’s go to Head’s channelRegistered method:

// HeadContext

@Overridepublic voidchannelRegistered(ChannelHandlerContext ctx) throws Exception {// 1\. This step is head's handling of channelRegistered events. No we should be concerned about invokeHandlerAddedIfNeeded; / / 2 \. Spread the Inbound event back CTX. FireChannelRegistered; }Copy the code

Head then executes the fireChannelRegister method:

// AbstractChanne_lH_andlerContext

@ Overridepublic ChannelHandlerContextfireChannelRegistered {key / / / / here is very findContextInbound method could find the next the Inbound along the pipeline Types of handlerinvokeChannelRegistered (findContextInbound); return this; }Copy the code

Note: pipeline fireChannelRegistered is threw channelRegistered events to the pipeline, pipeline of handlers to deal with this event. And the context. FireChannelRegistered is a handler after processing, back to the next handler.

They both have the same method name, but come from different classes.

FindContextInbound will find the next Inbound handler, and then repeat the above methods again.

I don’t think the above code should be too messy. The bottom line is to start from the head and work your way down to all Inbound handlers and perform their channelRegistered(CTX) operation.

Having said that, our register operation is really complete.

Let’s go back to the initAndRegister method:

final ChannelFuture initAndRegister {Channel channel = ; try {channel = channelFactory.newChannel; init(channel); } catch (Throwable t) {... } ChannelFuture regFuture = config.group.register(channel); // If (regfuture.cause! = ) {if (channel.isRegistered) {channel.close; } else {channel.unsafe.closeForcibly; // connect/bind; // connect/bind; // connect/bind If the register action was initiated in an eventLoop, the register must have completed by the time it gets here // 2\. If the register task has already been submitted to the taskQueue in the eventLoop, // since subsequent connect or bind tasks will also enter the queue in the same eventLoop, Therefore, connect or Bindreturn regFuture must be executed before the register succeeds. }Copy the code

We need to know that either the server NioServerSocketChannel or the _ client NioSocketChannel, when we bind or connect, we’re going to call initAndRegister first, so all of that, It’s universal for both.

It’s important to remember that register is important to know what you’re doing here, and after register, you’re going to go to bind or connect.

Connect process and BIND process analysis

The register operation we described above is the key. It sets up a lot of things. It is the starting point for NioSocketChannel and NioServerSocketChannel in Netty.

In this section, we’ll talk about connect and bind following register. This section is very simple.

Connect Process analysis

For _ client _ NioSocketChannel, after the previous register is completed, the connect step will connect to the server.

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, Final SocketAddress localAddress) {// Register is completed. Final ChannelFuture regFuture = initAndRegister; final Channel channel = regFuture.channel; IsDoneif (regFuture.isdone) {if (! regFuture.isSuccess) {return regFuture; Return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise); } else {.... }}Copy the code

So I’m not going to waste a lot of space here. Finally, we’ll come to AbstractChannel’s Connect method:

@Overridepublic ChannelFutureconnect(SocketAddress remoteAddress, ChannelPromise promise) {return pipeline.connect(remoteAddress, promise); }Copy the code

We see that the CONNECT operation is handed over to Pipeline to execute. Pipeline tail (); pipeline tail ();

The register operation we introduced earlier is Inbound and starts from head

@Overridepublic final ChannelFutureconnect(SocketAddress remoteAddress, ChannelPromise promise) {return tail.connect(remoteAddress, promise); }Copy the code

Handlers connect(…) to the Outbound handlers on the pipeline. Method, so where does the real underlying operation of CONNECT take place? Remember our pipeline diagram?

Handlers run the connect method each time they pass through a handler, and they end up in the head, because head is also Outbound. The connect operation we need is in the head, which calls the connect method provided in unsafe:

// HeadContextpublic voidconnect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {unsafe.connect(remoteAddress, localAddress, promise); }Copy the code

Next, let’s look at what connect calls the underlying operation in the unsafe class:

// AbstractNioChannel.AbstractNioUnsafe@Overridepublic final voidconnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {...... boolean wasActive = isActive; SocketChannel Connect; // SocketChannel Connect; InterestOps = selectionkey. OP_CONNECT// The return value indicates whether the connection is successful. FulfillConnectPromise (promise, wasActive); } else {connectPromise = promise; requestedRemoteAddress = remoteAddress; NioEventLoop: NioEventLoop: NioEventLoop: NioEventLoop: NioEventLoop Because I think does not matter much int connectTimeoutMillis = config. GetConnectTimeoutMillis; if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop.schedule(new Runnable {@Overridepublic voidrun {ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise ! = && connectPromise.tryFailure(cause)) {close(voidPromise); }}}, connectTimeoutMillis, TimeUnit.MILLISECONDS); }promise.addListener(new ChannelFutureListener {@Overridepublic voidoperationComplete(ChannelFuture future) throws Exception {if (future.isCancelled) {if (connectTimeoutFuture ! = ) {connectTimeoutFuture.cancel(false); }connectPromise = ; close(voidPromise); }}}); }} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed; }}Copy the code

If the doConnect method above returns false, what happens next?

In the register operation described in the previous section, a channel already registers with a selector, but sets interestOps to 0, which means it listens to nothing.

In the doConnect method above, it sets interestOps to selectionkey.op_connect after calling the underlying connect method.

The rest is NioEventLoop. Remember NioEventLoop’s run method? This means that after the connect succeeds, the TCP connection is established, and subsequent operations are handled by the processSelectedKeys method in the nioEventloop. run method.

Bind process analysis

With connect out of the way, let’s take a quick look at bind:

Private ChannelFuture doBind(final SocketAddress localAddress) {initAndRegister** Final ChannelFuture regFuture  = initAndRegister; final Channel channel = regFuture.channel; if (regFuture.cause ! = ) {return regFuture; }if (regfuture.isdone) {// Register is complete, then bind ChannelPromise promise = channel.newPromise; doBind0(regFuture, channel, localAddress, promise); return promise; } else {...... }}Copy the code

If you look inside, you can see that the bind operation is also performed by pipeline:

// AbstractChannel

@Overridepublic ChannelFuturebind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise); }Copy the code

Bind operations, like connect, are Outbound, so they all start with tail:

@Overridepublic final ChannelFuturebind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise); }Copy the code

The final bind operation goes to the head, which calls the bind method provided by unsafe:

@Overridepublic voidbind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {unsafe.bind(localAddress, promise); }Copy the code

For those of you interested in looking beyond unsafe, the bind method is very simple, and the bind operation is not an asynchronous method, so that’s it.

This section is very simple, is to introduce you to the Netty operation routine.