Have feelings, have dry goods, wechat search [three prince Aobin] concern about this is not the same programmer.
This article has been included on GitHub github.com/JavaFamily.
High warning, I started writing this article a month ago, so the content will be very long, of course, very hardcore, dubbo source series after I wanted to write a netty series, but netty source concept is very much, so I write now.
I believe that 90% of the readers will not read it in one breath, because it is too long, so long that my current MBP typing editor box is stuck, but I hope you want to see Netty in the future or need to know before the interview friends to turn over enough, then I write this article has the meaning.
Also not much BB, direct open whole.
NIO basic concepts
Block vs. Non-block
Blocking and non-blocking is a way of processing whether the data is ready when the process accesses it, when the data is not ready.
Blocking: It is often necessary to wait for the data in the buffer to be ready before doing anything else, otherwise it will stay there.
Non-blocking: When our process accesses our data buffer, it returns if the data is not ready, without waiting. If the data is ready, return it directly.
Blocking I/o:
Non-blocking IO:
Synchronous vs. Asynchronous
Both synchronous and asynchronous are based on how applications and operating systems process IO events. Such as
** Synchronization: ** is an application that directly participates in IO read/write operations.
** Asynchronous: ** All I/O reads and writes are left to the operating system. Applications only need to wait for notifications.
In synchronous mode, I/O events must be processed by blocking a method and waiting for the I/O event to complete (blocking or polling the I/O event). In asynchronous mode, all I/O reads and writes are handed over to the operating system. At this point, we can do other things, and we don’t need to do the actual IO operation, and when the operation completes the IO, we will give our application a notification.
Therefore, the direct benefit of asynchrony over synchronization is that when we process IO data, we can use the resources consumed by the wait to process other transactions, improving the performance of our service itself.
Synchronous IO:
Asynchronous I/o:
Java BIO vs. NIO
BIO (traditional IO) :
BIO is a synchronous and blocking IO pattern, a traditional Java. IO package that is implemented based on the stream model and provides some of the most familiar IO functions, such as File abstraction, input/output streams, and so on. The interaction is synchronous, blocking, that is, when reading an input stream or writing to an output stream, the threads block until the read or write action is complete, and the calls between them are in a reliable linear order.
NIO (non-blocking /New I/O)
NIO is a synchronous non-blocking I/O model introduced in Java 1.4. It corresponds to the Java. NIO package, providing abstractions such as channels, selectors, buffers, and so on. The N in NIO can be interpreted as non-blocking, not just New. It supports a buffer-oriented, channel-based approach to I/O operations. NIO provides two different Socket channel implementations, SocketChannel and ServerSocketChannel, as opposed to Socket and ServerSocket in the traditional BIO model. Both support both blocking and non-blocking modes. For high-load, high-concurrency (network) applications, NIO’s non-blocking mode should be used
BIO versus NIO
IO model | BIO | NIO |
---|---|---|
communication | Facing the flow | Facing the buffer |
To deal with | Blocking IO | Non-blocking IO |
The trigger | There is no | The selector |
NIO’s simple model for Server communication:
BIO’s simple model for Server communication:
NIO features:
- One thread can handle multiple channels, reducing the number of threads created;
- Non-blocking reading and writing, saving resources: When no data is available to read or write, no thread resources are wasted due to blocking
Reactor model
Single thread Reactor model
Multithreaded Reactor model
Multithreaded master-slave Reactor model
Basic concepts of Netty
Introduction of Netty
Netty is a NIO client server framework that makes it quick and easy to develop network applications such as protocol servers and clients. It greatly simplifies and simplifies network programming, such as TCP and UDP socket servers.
“Fast and easy” does not mean that the resulting application will suffer from maintainability or performance issues. Netty is well designed, combining experience with many protocols such as FTP, SMTP, HTTP, and various older protocols based on binary and text. As a result, Netty succeeded in finding a way to easily achieve development, performance, stability, and flexibility without compromising.
Netty Execution process
Netty core components
Channel
Channel is a basic construct of Java NIO. It can be regarded as a carrier of incoming or outgoing data. Therefore, it can be turned on or off, connected or disconnected.
EventLoop and EventLoopGroup
EventLoop defines Netty’s core abstraction for handling events that occur during the lifetime of a connection. Internally, an EventLoop is assigned to each Channel.
An EventLoopGroup is an EventLoop pool that contains many Eventloops.
Netty assigns an EventLoop to each Channel, which handles all events such as user connection requests, processing user requests, and so on. EventLoop itself is merely a thread-driven process that binds only one thread to handle all IO events for a Channel during its lifetime.
Once a Channel is bound to an EventLoop, it cannot be changed throughout the lifetime of the Channel. An EventLoop can be bound to multiple channels. That is, the relationship between Channel and EventLoop is N :1, and the relationship between EventLoop and thread is 1:1.
ServerBootstrap with the Bootstrap
Bootstarp and ServerBootstrap are referred to as bootstrap classes and refer to the process of configuring an application and getting it running. Netty handles boot by isolating your application from the network layer.
Bootstrap is the client Bootstrap class that creates a new Channel when calling bind() (to connect to UDP) and connect() (to connect to TCP). Create a single Channel that has no parent Channel to implement all network switching.
ServerBootstrap is the bootstrap class on the server side. ServerBootstarp creates a ServerChannel to accept connections from clients when it calls bind(). And the ServerChannel manages multiple sub-channels for communication with clients.
ChannelHandler and ChannelPipeline
ChannelHandler is a handler for data in a Channel. These handlers can be either system-defined codecs or user-defined. These processors are added to the object of a ChannelPipeline, and the data in the Channel is processed in the order in which they are added.
ChannelFuture
All I/O operations in Netty are asynchronous, that is, operations do not immediately return results, so Netty defines a ChannelFuture object as the “spokesperson” for this asynchronous operation, representing the asynchronous operation itself. If you want to get the return value of the asynchronous operation, you can add a NIO Netty listener to the asynchronous operation by using the addListener() method of the asynchronous operation object, and register a callback for it: call execution as soon as the result is available.
Netty’s asynchronous programming model is based on the concepts of Futures and callbacks.
Netty source reading
Source code reading, it is best to Debug the case, it is easier to help understand, so in the analysis of Netty before I prepare a client and server code.
Netty-server code
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(newSomeSocketServerHandler()); }}); ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("Server started...");
future.channel().closeFuture().sync();
} finally{ parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); }}}Copy the code
The Server end Handler:
public class DemoSocketServerHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Client Address ====== " + ctx.channel().remoteAddress());
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
ctx.fireChannelActive();
TimeUnit.MILLISECONDS.sleep(500);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
Netty-client code
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(newDemoSocketClientHandler()); }}); ChannelFuture future = bootstrap.connect("localhost".8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup ! =null) { eventLoopGroup.shutdownGracefully(); }}}}Copy the code
The Client end Handler:
public class DemoSocketClientHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println(msg);
ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(5000);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
ctx.channel().writeAndFlush("From Client: Begin talking");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
NioEventLoopGroup initialization analysis
Firstly, the initialization process of NioEventLoopGroup is analyzed according to the Server code. Before analyzing NioEventLoopGroup, it is necessary to briefly say NioEventLoopGroup and NioEventLoop, to facilitate the subsequent source understanding.
NioEventLoop source analysis before understanding
Inheritance system of NioEventLoop
From the inheritance system of NioEventLoop, we can see that NioEventLoop itself is an Executor, and is a single-threaded Executor. Executor must have an execute (Runnable command) the implementation of the method, and NioEventLoop the execute () method in its parent class SingleThreadEventExecutor, find a specific code:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if(! inEventLoop) { startThread();if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true; }}catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if(reject) { reject(); }}}if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
Copy the code
I won’t go into detail here, but the main reason I post this code is to introduce startThread(); This code, as you’ll see next, ends up calling a member Executor of NioEventLoop to execute the current member’s execute() method. Corresponding member of the io.net ty. Util. Concurrent. SingleThreadEventExecutor# executor
The initialization of an executor member is also an anonymous executor that is created when the current code is executed, that is, the method is created and executes the current anonymous executr() method.
Conclusion:
- NioEventLoop itself is an Executor.
- NioEventLoop encapsulates this new thread Executor member internally.
- NioEventLoop there are two
execute
Method, in addition to its ownexecute()
The method also corresponds to the member attribute Executorexecute()
Methods.
Note: Since there are four executors, we give them new names to distinguish them:
NioEventLoop itself Executor: NioEventLoop
NioEventLoop member Executor: subexecutor
NioEventLoopGroup itself Executor: NioEventLoopGroup
NioEventLoopGroup construction parameter Executor: chief Executor
Inheritance system of NioEventLoopGroup
Looking at the inheritance system, you can see directly that NioEventLoopGroup is also an Executor, and that it is a thread pool Executor, so it also has an execute() method. The realization of the corresponding to its parent class: io.net ty. Util. Concurrent. AbstractEventExecutorGroup# execute
Here also need to say to the point that: in the construction of NioEventLoopGroup, to its parent class MultithreadEventExecutorGroup structure introduced a new Executor again,
This Executor is mentioned here because the corresponding execute() is called when execute() is executed by a member Executor in the NioEventLoop. This is the corresponding code call below. io.netty.util.internal.ThreadExecutorMap#apply(java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutor)
If you don’t understand this point, it’s ok, because it’s just to introduce two executors for NioEventLoopGroup and NioEventLoop, and two execute() methods for each Executor. More on that later.
Conclusion:
- NioEventLoopGroup is a thread pool thread Executor.
- NioEventLoopGroup also encapsulates a thread Executor.
- NioEventLoopGroup also has two
execute()
Methods.
NioEventLoopGroup initialization code analysis
Above said the basic understanding of the content, the following specific analysis, from NioEventLoopGroup initialization into the source analysis.
For entry we go directly to the no-argument construct of NioEventLoopGroup.
public NioEventLoopGroup(a) {
this(0);
}
Copy the code
public NioEventLoopGroup(int nThreads) {
// The second argument is the executor that the group contains
this(nThreads, (Executor) null);
}
Copy the code
public NioEventLoopGroup(int nThreads, Executor executor) {
// The third argument is provider, which provides the channel of selectable and selector,
// This is the only singleton provider in the current JVM
this(nThreads, executor, SelectorProvider.provider());
}
Copy the code
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// The fourth argument is a selection policy factory instance
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
Copy the code
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Copy the code
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// The third argument is the selector factory instance
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code
Following this, we can see that the basic parameters of the no-argument construct are initialized, nThreads: DEFAULT_EVENT_LOOP_THREADS / / by default, twice the number of the current CPU core logic selectorProvide: SelectorProvider. The provider () / / the current JVM the only provider of a singleton, SelectStrategyFactory: DefaultSelectStrategyFactory INSTANCE / / factory as an example, the default selection strategy ChooserFactory: DefaultEventExecutorChooserFactory INSTANCE / / selector factory INSTANCE. Here is just the basic initialization parameter, the structure of the key methods for MultithreadEventExecutorGroup method. The following key analysis:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// This executor is the executor contained in the group, which will create one thread for each eventLoop it contains
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
/ / create eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// If one of these eventloops fails to be created, all previously created eventloops will be closed
if(! success) {// Close all previously created eventloops
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// Terminate all tasks performed on eventLoop
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// Create a selector
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null); }}};for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
Copy the code
Following directly with the no-argument construct, you can see that the core is in the last superclass construct. Is io.net ty. Util. Concurrent. MultithreadEventExecutorGroup# MultithreadEventExecutorGroup (int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…) .
I’m going to initialize the entire NioEventLoopGroup instance here, and I’m going to analyze it here, and then I’m going to draw a picture to review it.
Initialize the Executor parameter in the construct parameter, initializing it when it is empty
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
Copy the code
NewDefaultThreadFactory ()) creates the default thread factory. Then create the ThreadPerTaskExecutor thread Executor object. (PS: The Executor created here is an Executor object within the NioEventLoopGroup, not the current NioEventLoopGroup itself, which can be called a chief Executor).
You can then see that an array of children has been created, with as many arrays as you need to create.
children = new EventExecutor[nThreads];
Copy the code
Since each NioEventLoopGroup is a collection of NioEventLoops, the children array here is the NioEventLoop for the current NioEventLoopGroup. So the NioEventLoop is created when the NioEventLoopGroup is initialized. Let’s look at NioEventLoop initialization:
// Create nioEventLoop instances one by one
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
/ / create eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// If one of these eventloops fails to be created, all previously created eventloops will be closed
if(! success) {// Close all previously created eventloops
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// Terminate all tasks performed on eventLoop
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
Copy the code
If an exception occurs, all nioEventLoops that have been created will be closed. The important code is the creation of NioEventLoop. Children [I] = newChild(executor, args); Go down, directly find io.net ty. Channel. Nio. NioEventLoopGroup# newChild, because the current is NioEventLoopGroup created, so know find a subclass newChild implementation.
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code
The args parameter is strongly reversed to follow up the NioEventLoop construction:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// Create a binary group of selectors
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
Copy the code
Let’s take a look at the whole thing here and initialize the previous default parameter to the NioEventLoop property. There are two: openSelector() and super(parent, executor, False, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler). Here’s a look at the superclass construct:
With down, direct is SingleThreadEventLoop – > SingleThreadEventExecutor initialization, these can also be in NioEventLoop inheritance system can see:
// io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// Create an endpoint queue
tailTasks = newTaskQueue(maxPendingTasks);
}
// io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// This is the executor that NioEventLoop currently contains
this.executor = ThreadExecutorMap.apply(executor, this);
// Create a task queue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
Copy the code
Here is first created SingleThreadEventExecutor, here the key code is need to pay attention to:
this.executor = ThreadExecutorMap.apply(executor, this);
Copy the code
This is a NioEventLoop, so this. Executor is an executor in a NioEventLoop. We call this a subexecutor. This corresponds to NioEventLoopGroup).
In this case, the subexecutor is initialized with an Executor parameter, which is the same Executor that the NioEventLoopGroup constructor has been putting in. Let’s go ahead and see how this subexecutor is initialized.
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// The executor created here is a subexecutor
return new Executor() {
// This execute() is the subexecutor's execute()
@Override
public void execute(final Runnable command) {
// Execute () is called for the executor contained in the NioEventLoopGroup
// Execute () is called for the total executorexecutor.execute(apply(command, eventExecutor)); }}; }Copy the code
A closer look at this code shows that the creation of the subexecutor created here is the creation of a thread, but the important thing is that the execute() method of the thread Executor does only one thing: it calls the execute() method of the chief Executor passed in. So what the sub-executor does here is call the chief Executor’s execute(). Don’t think this is confusing, because this is just initialization, and it’s going to get even more confusing. [Hand cover face crying]
The apply(command, eventExecutor) function will record the current thread when the execute() function is executed, and delete the current record value when the execute() function is completed.
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run(a) {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null); }}}; }Copy the code
TaskQueue = newTaskQueue(this.maxPendingTasks); taskQueue = newTaskQueue(this.maxPendingTasks); TailTasks = newTaskQueue(maxPendingTasks); . We’ll talk about these queues later. This continues with the NioEventLoop main process initialization.
Now let’s go back and look at openSelector(), and here we need to know SelectorTuple:
private static final class SelectorTuple {
final Selector unwrappedSelector; // NIO native selector
final Selector selector; // The optimized selector
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector; }}Copy the code
SelectorTuple is just an inner class that contains two selectors to encapsulate the Selector before and after optimization. The openSelector() method returns a Selector and determines whether the current Selector needs to be optimized based on the configuration. Here’s the code:
And the actual optimization process, if you’re interested in it, you can see for yourself, just know that if you disable optimization, then the optimized Selector for SelectorTuple and the optimized Selector for SelectorTuple are Nio native selectors.
The io.net ty. Util. Concurrent. MultithreadEventExecutorGroup# MultithreadEventExecutorGroup (int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…) After the NioEventLoop array is created, there are selectors to create and close the listener binding, and so on. You can see for yourself, but I will not cover it here.
To this NioEventLoop to create the process of the code is all over. I think if only look at this is definitely still a little muddled, source code this thing needs to follow up to see, debug a little bit with, follow the running code to think why so implementation, but here I also draw a diagram, let you more intuitive understanding of the NioEventLoopGroup creation process and the main operation.
I think you combined with this diagram, combined with the above analysis process, it is best to find their own source code, follow again, should be able to understand the creation of NioEvnetLoopGroup.
Configuration analysis of ServerBootstrap and ServerBootstrap properties
Inheritance system:
Entry code:
//2. Create the ServerBootstrap class ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3. Configure two large thread groups for the bootstrap class, and determine the thread model
b.group(bossGroup, workerGroup)
// (Non-mandatory) Prints logs
.handler(new LoggingHandler(LogLevel.INFO))
// 4. Specify the I/O model
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5. You can customize the business processing logic of client messages
p.addLast(newHelloServerHandler()); }}); Bootstrap bootstrap =new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(newSomeSocketClientHandler()); }});Copy the code
ServerBootstrap and Bootstrap are both Bootstrap classes. The only difference is that ServerBootstrap is a server class and Bootstrap is a client class that binds to the EventLoopGroup we created. It is called a configuration class because it assigns values to ServerBootstrap and Bootstrap properties. Take a look inside the group() method:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup ! =null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
Copy the code
Other methods are the same, interested can go in to see. This is just initialization, but it’s all in preparation for later operations.
Server – side bind method serverBootstrap.bind () source code parsing
Here we enter from here:
b.bind(port).sync();
Copy the code
Follow directly from the bind() method:
// io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
// Follow up
public ChannelFuture bind(SocketAddress localAddress) {
// Verify that group and channelFactory are null
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// This is the key logic
return doBind(localAddress);
}
Copy the code
Check whether the Bootstrap group is successfully bound to channelFactory. Then follow up with the doBind() method:
private ChannelFuture doBind(final SocketAddress localAddress) {
// Create, initialize, and register a channel with a selector, returning an asynchronous result
final ChannelFuture regFuture = initAndRegister();
// Get the channel from the asynchronous result
final Channel channel = regFuture.channel();
// If an exception occurs during the execution of the asynchronous operation, return the asynchronous object directly.
if(regFuture.cause() ! =null) {
return regFuture;
}
// Handle the case when the asynchronous operation completes (it may end normally, or an exception occurs, or the task is cancelled, which are all consequences)
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// Bind the specified port
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // Handle the case where the asynchronous operation has not yet produced a result
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// Add listeners for asynchronous operations
regFuture.addListener(new ChannelFutureListener() {
// Triggers execution of this method if the asynchronous operation has a result (completion)
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) { // An error occurred during the execution of the asynchronous operation
promise.setFailure(cause);
} else { // The result of asynchronous operation is normal
promise.registered();
// Bind the specified portdoBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code
First of all, let’s figure out the overall logic of this method, and then study the specific operation of each step. Draw a picture to understand what this method does first:
You can combine the code in the diagram to see the big picture of dobind(), but there are a lot of important details to follow up on, namely Tag 1 and Tag 2. To make it easier to follow the code and come back, here is the Tag, and then analyze the source code of the Tag:
Add Tag 0:
I know ChannelPromise and ChannelFuture.
The Tag 1:
Create, initialize, and register a channel with a selector asynchronously
final ChannelFuture regFuture = initAndRegister();
The Tag # 2:
Bind the specified port number:
doBind0(regFuture, channel, localAddress, promise);
Added Tag 0: ChannelPromise and ChannelFuture
ChannelPromise is a special ChannelFuture, is a modified ChannelFuture. Provides an internal method to modify the current Future state. On the basis of ChannelFuture, the modification method of setting the final state is implemented.
ChannelFuture can only query the result of the current asynchronous operation, and cannot modify the Future of the current asynchronous result. The important thing to know here is that ChannelPromise can change the state of the current asynchronous result and will trigger the listener if the state is changed. In doBind, this function is used to assign an exception to the ChannelPromise and return an exception if there is an exception to the asynchronous result.
Tag 1: initAndRegister() initializes and registers the Channel
First find the code:
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
/ / create the channel
channel = channelFactory.newChannel();
// Initialize the channel
init(channel);
} catch (Throwable t) {
if(channel ! =null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// Register a channel with a selector
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
if (channel.isRegistered()) {
channel.close();
} else{ channel.unsafe().closeForcibly(); }}return regFuture;
}
Copy the code
Huh? ! Code meaning a look, zha so little, also did three things, but each of these three things to do is not a code can be completed. Here we analyze one by one. In addition to these three things, the rest is the processing logic after the exception. Therefore, the main process is the following three sentences of code, which is also marked to follow up:
1.1 create Tag channel channel = channelFactory. NewChannel ();
Tag 1.2 initializes channel init(channel);
Tag 1.3 Register channel with selector ChannelFuture regFuture = config().group().register(channel);
For all three, we need to analyze them one by one.
The Tag. 1.1 channelFactory newChannel () to create a Channel
To find the corresponding code: io.net ty. Channel. ReflectiveChannelFactory# newChannel
@Override
public T newChannel(a) {
try {
// Create a channel by calling the constructor with no parameters
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class "+ constructor.getDeclaringClass(), t); }}Copy the code
The ReflectiveChannelFactory method is used to set the channel between ServerBootstrap and Bootstrap. Follow to find the assignment code for the channelFactory property:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
Copy the code
The new ReflectiveChannelFactory class is ReflectiveChannelFactory. The constructor of ReflectiveChannelFactory is as follows:
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// Initialize NioServerSocketChannel's parameterless constructor to constructor
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e); }}Copy the code
The ReflectiveChannelFactory initializes its constructor property when it is created. The constructor of the passed channel class clazz is assigned to the Constructor property of the ReflectiveChannelFactory.
And we again incoming Server side channel class for NioServerSocketChannel. The class, so the above constructor. The newInstance (); The corresponding is the no-argument construct of NioServerSocketChannel. So let’s follow up on NioServerSocketChannel:
// Provider in NIO, which is used to create selectors and channels. And they're singletons
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel(a) {
// DEFAULT_SELECTOR_PROVIDER Static variable
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
Copy the code
Follow up with newSocket() :
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// Create NIO native channel => ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e); }}Copy the code
It returns a Java NIO native Channel, and then wraps the NIO native Channel into a NioServerSocketChannel, Follow this(newSocket(DEFAULT_SELECTOR_PROVIDER)) to find the constructor code:
public NioServerSocketChannel(ServerSocketChannel channel) {
// Parameter 1: parent channel
// Parameter 2: NIO native channel
// Parameter 3: specifies that the event concerned by the current channel is the accept connection
super(null, channel, SelectionKey.OP_ACCEPT);
// Set of attributes used to configure a channel
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code
There are two main things to do here: 1. Call the superclass construct, and 2. Configure the property set for the channel.
Here said the first new NioServerSocketChannelConfig (), the operation is to give the current Channel assignment config, used to save the current collection of the attribute of the Channel configuration. Super (null, channel, selectionKey.op_accept)
// io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
// io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
// This. Ch is the NIO native channel
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// NIO, non-blocking
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2); }}throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code
Find the AbstractNioChannel parent constructor directly. This first step is also to call the parent constructor super(parent); Remember, first look at what else is being done besides calling the superclass constructor:
- Construct super(parent) by calling the parent class;
- Copy the previously created native Channel to the property this.ch = ch;
- This. ReadInterestOp = readInterestOp; // Selectionkey. OP_ACCEPT Accept event
- Set the NIO native Channel to non-blocking ch.configureBlocking(false);
The AbstractNioChannel constructor does four things. The main thing to say is what it does when it calls the parent constructor. Find the code:
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// Generate an ID for the channel, consisting of five parts
id = newId();
// Create an underlying operation object, unsafe
unsafe = newUnsafe();
// Create a channel pipeline bound to this channel
pipeline = newChannelPipeline();
}
Copy the code
There are three main things that are done in the AbstractChannel construct:
- Generates an ID for the current Channel
newId()
If you are interested, you can follow and have a look.- Unsafe generates an underlying action object, called by the I/O thread, which cannot be called by user code.
newUnsafe()
- Creating a channelPipeline bound to this channel is also a key operation, but I will not go into detail here. The code of the Channel Pipeline will be separately followed later.
**Tag 1: initAndRegister() **Tag 1.1 newChannel() ** create Channel NewChannel () Tag 1.1 newChannel()
According to the figure, in combination with the above code analysis, it is best to follow the code again, I think this piece of understanding is no problem. I’m just creating a Channel up here. Tag 1.1 Channel creation is finished, followed by Tag 1.2 init(Channel).
Tag 1.2 init(channel) Initializes a channel
The us from the doBind ServerBootstrap into here, so here directly find io.net ty. The bootstrap. ServerBootstrap# init
void init(Channel channel) throws Exception {
// Obtain the options property in serverBootstrap
finalMap<ChannelOption<? >, Object> options = options0();// Set the options property to the channel
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// Obtain the attrs attribute in serverBootstrap
finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
// Iterate over the attrs attribute
for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
// Initialize the currently traversed ATTR to the channelchannel.attr(key).set(e.getValue()); }}// Get the pipeline for the channel
ChannelPipeline p = channel.pipeline();
// Write all serverBootstrap properties starting with child to a local variable,
// Then initialize them into childChannel
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions;finalEntry<AttributeKey<? >, Object>[] currentChildAttrs;synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
// Add ServerBootstrapAcceptor processor to pipeline
// The ServerBootstrapAcceptor handler is used to receive property values in ServerBootstrap.
// We usually call this a connection processor
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
So there’s a lot of things that we’re doing here, the basic operations that I’ve highlighted in the comments above, and the main operations that we need to follow up on, so let’s just Tag and go ahead. Options () and attrs () values are set by ServerBootstrap and attr() before doBind() is called. The options attribute is set to the config attribute of the Channel, and the ATTRs attribute is directly set to the Channel.
After setting the options and attrs properties, we get the pipeline for the current channel, and then we get the property values that we set before doBind(). The value of the property set by the method childOption() starting with child and childAttr().
Here we’re using local variables to record all the values associated with the Child currentChildGroup, currentChildHandler, currentChildOptions, CurrentChildAttrs is used to initialize the properties of childChannel, new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, CurrentChildAttrs)) primarily creates the connection handler.
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
// Add ServerBootstrapAcceptor processor to pipeline
// The ServerBootstrapAcceptor handler is used to receive property values in ServerBootstrap.
// We usually call this a connection processor
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code
The first thing you want to do here is bind the pipeline of the current channel to a ChannelInitializer. Because it is an abstract class, you need to implement the initChannel method anonymously. The main operation is to initialize the channel in the childGroup. I just want to focus on what the connection handler ServerBootstrapAcceptor does. The rest will be covered later in the handler and pipeline.
* * added: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap After the parentGroup receives the connection, it simply transfers the current to childGroup to handle subsequent operations. ChildGroup is designed to handle the operation after the connection and does not care about the connection task of the channel. This is actually the processing logic of the Netty-Server Reactor thread pool model.
The connection handler is ServerBootstrapAcceptor.
ServerBootstrapAcceptor(
finalChannel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<? >, Object>[] childOptions, Entry<AttributeKey<? >, Object>[] childAttrs) {this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run(a) {
channel.config().setAutoRead(true); }}; }Copy the code
The ServerBootstrapAcceptor construct simply saves the Child property Settings configured in ServerBootstrap. And this keeps saying this is the connection handler, because when the client connection is sent to the server, this handler will receive the client connection and process it. The main processing method is the implementation of channelRead:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// MSG indicates the data sent by the client, which is NioSocketChannel
final Channel child = (Channel) msg;
// Initialize the child starting property from ServerBootstrap to childChannel (childHandler, childOptions, childAttrs)
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for(Entry<AttributeKey<? >, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); }try {
// register the childChannel with a selector. Note that the selector is not the same one registered by the parent channel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); }catch(Throwable t) { forceClose(child, t); }}Copy the code
There are mainly two things done here:
- Initialize childChannel
- Registers the channel successfully connected from the client to the selector.
The netty thread model uses the “Server listening thread” and “I/O thread” separation mode for Server side processing. So the channelRead method is used to bind the currently connected I/O thread to childChannel and register it with the Selector in ChildGroup when the client requests to connect to the server. Thread, model can refer to the following figure:
Ok, so we’re done with the Tag 1.2 initChannel code, and there’s a little bit of pipeline, handler, selector that I’m not going to talk about because I’m going to talk about it separately, and I’m not going to expand it directly here.
Let me draw a picture here, too: when the picture is put together, now the analysis process is like breaking the whole into pieces, and then putting the whole together.
In addition to the init(channel) method, we mainly talk about the ServerBootstrapAcceptor connection handler. In fact, the main netty-Server thread model and code combined understanding.
This post is too long and will exceed the word limit on most blog sites, so I’ll post it
I’m Aobin, the more you know, the more you don’t know, thanks for your likes, favorites and comments, we’ll see you next time!
The article continues to update, can wechat search a search “three prince Aobin” the first time to read, reply [data] I prepared for the first line of dafang interview data and resume template, this article GitHub github.com/JavaFamily has been included, there are dafang interview complete test point, welcome to Star.