“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”
Netty handles IO events and async tasks. Netty handles IO events and async tasks. Netty handles IO events and async tasks.
One, source code search
As we learned in the previous chapter, Netty’s reactor thread listens for IO events when they occur, and then processes them.
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
We only made a general analysis of the code here yesterday without in-depth explanation. This chapter specifically analyzed the access of new connections and Channel data reading.
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); . Ignore validation................try {
intreadyOps = k.readyOps(); . Ignore the processing logic of other events...............if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
Unsafe.read (), an event that involves a read or a new connection, is the most important thing to worry about.
Second, new connection access source code analysis
The Unsafe object is the NioServerSocketChannel, and the Unsafe object is an Unsafe object, It’s of the NioMessageUnsafe type, so if you’re unsure, review the initialization of NioServerSocketChannel!
Unsafe. Read (); unsafe. The NioMessageUnsafe method to address immediately is read:
@Override
public void read(a) {... Ignore unnecessary code............try {
try {
do {
// Read data may be data or a new connection
int localRead = doReadMessages(readBuf);
// If there is no data, jump
if (localRead == 0) {
break;
}
//-1 indicates that the connection is closed
if (localRead < 0) {
closed = true;
break;
}
// The number of connections read increases
allocHandle.incMessagesRead(localRead);
// Each time a maximum of 16 connections are read by default
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
// Get the number of connections or the number of data read
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// Start propagating the channelRead property
pipeline.fireChannelRead(readBuf.get(i));
}
// Clear the buffer
readBuf.clear();
allocHandle.readComplete();
// Propagate the read completion eventpipeline.fireChannelReadComplete(); . Ignore unnecessary code....................... }finally{... Ignore unnecessary code....................... }}Copy the code
1. Read the new connection
int localRead = doReadMessages(readBuf);
Copy the code
This line of code is the main logic for reading a new connection:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// Call JDK ServerSocketChannel to get a new connection to JDK SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if(ch ! =null) {
// Wrap client connections directly as Netty's pipe wrapper NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1; }}catch(Throwable t) { ................ Ignore exception handling............. }return 0;
}
Copy the code
As you can see, the logic here is simple. First, Netty will use the native SocketChannel of the previously saved JDK to call the Accept method to fetch the newly connected JDK pipe!
The JDK NIO native Channel is a Netty NIO socketchannel. The JDK NIO native Channel is a Netty NIO socketchannel. Note that the object in the buffer is the Netty wrapper object! After wrapping it, we return it directly. At this point, our buffer contains the amount of data. This data is the NioSocketChannel object!
Let’s go back to the main thread of the read method, and when we call the doMessage method, we start processing the NioSocketChannel.
2. Handle the newly connected pipes
pipeline.fireChannelRead(readBuf.get(i));
Copy the code
From the code, it can be seen that the NioSocketChannel we just read is propagated from the NioSocketChannel down. This code is propagated in the channel. We have talked about it in previous classes.
Let’s look at the following code:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
Copy the code
ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor: ServerBootstrapAcceptor:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// The channel object that can enter this section of logic must be the channel object, because only the server pipe will have this handler
final Channel child = (Channel) msg;
Append childHandler to the server pipeline when building ServerBootStrap
child.pipeline().addLast(childHandler);
// Passed in when building ServerBootStrap
setChannelOptions(child, childOptions, logger);
// Passed in when building ServerBootStrap
setAttributes(child, childAttrs);
try {
// Start registering with the same logic as NioServerSocketChannel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}Copy the code
We can see this: we register some client parameters in the Channel, and then we register the Channel. We register the Channel with the same logic as NioServerSocketChannel, except that NioSocketChannel is concerned with the OP_READ event. Here is a homework, students can analyze the creation of NioSocketChannel, analyze its registration logic and reactor logic!
Third, client data read source code analysis
So let’s go straight back
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
finalAbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); . Ignore validation................try {
intreadyOps = k.readyOps(); . Ignore the processing logic of other events...............if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! =0 || readyOps == 0) { unsafe.read(); }}catch(CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}Copy the code
NioServerSocketChannel is responsible for the client’s new connection channel, which is 90% similar to NioServerSocketChannel.
1. Read channel data
NioSocketChannel’s Unsafe is NioByteUnsafe, so we go straight to:
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
See how he reads the data:
@Override
public final void read(a) {... Ignore...// Get the client channel pipe
final ChannelPipeline pipeline = pipeline();
// Get a memory allocator
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// Allocate a ByteBuf buffer
byteBuf = allocHandle.allocate(allocator);
// Start writing channel data to buffer
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// If no buffer is read, the buffer is released.
byteBuf.release();
byteBuf = null;
// Set the close flag
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// Propagate a readChnnel event
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
// Propagate a readChnnelComplete event
pipeline.fireChannelReadComplete();
// Close the channel
if(close) { closeOnRead(pipeline); }}catch(Throwable t) { .................... Ignore unnecessary code..................... }}Copy the code
We divide logic into the following steps:
- Get a memory allocator. Netty has a memory allocator dedicated to allocating ByteBuf. Here is get it out!
- Allocate a buffer using the memory allocator obtained in the previous step, and use the domain for subsequent use!
- Start reading the data in the channel and write to the pre-allocated buffer!
- After reading data, the buffer with data will call Pieline propagation method for data propagation readChannel method!
- When the data in the channel has been processed, the channelReadComplete method is propagated
Four,
-
In Netty, NioServerSocketChannel and NioSocketChannel have different processing methods for reading data. NioServerSockerChannel is mainly used to process new connections, A new connection access ServerBootstrapAcceptor is added to the channel during initialization.
The NioServerSocketChannel object reads the data and wraps it as a NioSocketChannel object, then registers the NioSocketChannel with ServerBootstrapAcceptor and starts the reactor thread!
-
NioSockerChannel allocates a buffer when data is present in the channel, reads the data into the pre-allocated buffer, and then flows the data down the channel (event triggered)!