“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”

Netty handles IO events and async tasks. Netty handles IO events and async tasks. Netty handles IO events and async tasks.

One, source code search

As we learned in the previous chapter, Netty’s reactor thread listens for IO events when they occur, and then processes them.

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

We only made a general analysis of the code here yesterday without in-depth explanation. This chapter specifically analyzed the access of new connections and Channel data reading.

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); . Ignore validation................try {
        intreadyOps = k.readyOps(); . Ignore the processing logic of other events...............if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

Unsafe.read (), an event that involves a read or a new connection, is the most important thing to worry about.

Second, new connection access source code analysis

The Unsafe object is the NioServerSocketChannel, and the Unsafe object is an Unsafe object, It’s of the NioMessageUnsafe type, so if you’re unsure, review the initialization of NioServerSocketChannel!

Unsafe. Read (); unsafe. The NioMessageUnsafe method to address immediately is read:

@Override
public void read(a) {... Ignore unnecessary code............try {
        try {
            do {
                // Read data may be data or a new connection
                int localRead = doReadMessages(readBuf);
                // If there is no data, jump
                if (localRead == 0) {
                    break;
                }
                //-1 indicates that the connection is closed
                if (localRead < 0) {
                    closed = true;
                    break;
                }
				// The number of connections read increases
                allocHandle.incMessagesRead(localRead);
                // Each time a maximum of 16 connections are read by default
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
		// Get the number of connections or the number of data read
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // Start propagating the channelRead property
            pipeline.fireChannelRead(readBuf.get(i));
        }
        // Clear the buffer
        readBuf.clear();
        allocHandle.readComplete();
        // Propagate the read completion eventpipeline.fireChannelReadComplete(); . Ignore unnecessary code....................... }finally{... Ignore unnecessary code....................... }}Copy the code

1. Read the new connection

int localRead = doReadMessages(readBuf);
Copy the code

This line of code is the main logic for reading a new connection:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    // Call JDK ServerSocketChannel to get a new connection to JDK SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if(ch ! =null) {
            // Wrap client connections directly as Netty's pipe wrapper NioSocketChannel
            buf.add(new NioSocketChannel(this, ch));
            return 1; }}catch(Throwable t) { ................ Ignore exception handling............. }return 0;
}
Copy the code

As you can see, the logic here is simple. First, Netty will use the native SocketChannel of the previously saved JDK to call the Accept method to fetch the newly connected JDK pipe!

The JDK NIO native Channel is a Netty NIO socketchannel. The JDK NIO native Channel is a Netty NIO socketchannel. Note that the object in the buffer is the Netty wrapper object! After wrapping it, we return it directly. At this point, our buffer contains the amount of data. This data is the NioSocketChannel object!

Let’s go back to the main thread of the read method, and when we call the doMessage method, we start processing the NioSocketChannel.

2. Handle the newly connected pipes

pipeline.fireChannelRead(readBuf.get(i));
Copy the code

From the code, it can be seen that the NioSocketChannel we just read is propagated from the NioSocketChannel down. This code is propagated in the channel. We have talked about it in previous classes.

Let’s look at the following code:

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}
Copy the code

ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // The channel object that can enter this section of logic must be the channel object, because only the server pipe will have this handler
    final Channel child = (Channel) msg;
	Append childHandler to the server pipeline when building ServerBootStrap
    child.pipeline().addLast(childHandler);
	// Passed in when building ServerBootStrap
    setChannelOptions(child, childOptions, logger);
	// Passed in when building ServerBootStrap
    setAttributes(child, childAttrs);
    try {
        // Start registering with the same logic as NioServerSocketChannel
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}Copy the code

We can see this: we register some client parameters in the Channel, and then we register the Channel. We register the Channel with the same logic as NioServerSocketChannel, except that NioSocketChannel is concerned with the OP_READ event. Here is a homework, students can analyze the creation of NioSocketChannel, analyze its registration logic and reactor logic!

Third, client data read source code analysis

So let’s go straight back

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); . Ignore validation................try {
        intreadyOps = k.readyOps(); . Ignore the processing logic of other events...............if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code

NioServerSocketChannel is responsible for the client’s new connection channel, which is 90% similar to NioServerSocketChannel.

1. Read channel data

NioSocketChannel’s Unsafe is NioByteUnsafe, so we go straight to:

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

See how he reads the data:

@Override
public final void read(a) {... Ignore...// Get the client channel pipe
    final ChannelPipeline pipeline = pipeline();
    // Get a memory allocator
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // Allocate a ByteBuf buffer
            byteBuf = allocHandle.allocate(allocator);
            // Start writing channel data to buffer
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // If no buffer is read, the buffer is released.
                byteBuf.release();
                byteBuf = null;
                // Set the close flag
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // Propagate a readChnnel event
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        // Propagate a readChnnelComplete event
        pipeline.fireChannelReadComplete();
		// Close the channel
        if(close) { closeOnRead(pipeline); }}catch(Throwable t) { .................... Ignore unnecessary code..................... }}Copy the code

We divide logic into the following steps:

  1. Get a memory allocator. Netty has a memory allocator dedicated to allocating ByteBuf. Here is get it out!
  2. Allocate a buffer using the memory allocator obtained in the previous step, and use the domain for subsequent use!
  3. Start reading the data in the channel and write to the pre-allocated buffer!
  4. After reading data, the buffer with data will call Pieline propagation method for data propagation readChannel method!
  5. When the data in the channel has been processed, the channelReadComplete method is propagated

Four,

  1. In Netty, NioServerSocketChannel and NioSocketChannel have different processing methods for reading data. NioServerSockerChannel is mainly used to process new connections, A new connection access ServerBootstrapAcceptor is added to the channel during initialization.

    The NioServerSocketChannel object reads the data and wraps it as a NioSocketChannel object, then registers the NioSocketChannel with ServerBootstrapAcceptor and starts the reactor thread!

  2. NioSockerChannel allocates a buffer when data is present in the channel, reads the data into the pre-allocated buffer, and then flows the data down the channel (event triggered)!