Netty threading model
Netty implements the Reactor thread model, which has four parts:
- Resources: Resources, tasks, are requests from clients
- Synchronous event multiplexer: Event polling, where the selector of the boss thread polls for the client’s events
- The boss thread will assign the client request to the worker thread for I/O processing
- Request handler, which handles client I/O requests
Code sample
static final int PORT = Integer.parseInt(System.getProperty("port"."8099"));
public static void main(String[] args) {
/ / create EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler handler = new EchoServerHandler();
try {
// Create an initiator
ServerBootstrap bootstrap = new ServerBootstrap();
// Configure the initiator
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(handler); }});// Bind ports to start
ChannelFuture f = bootstrap.bind(PORT).sync();
// Close the initiator
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}Copy the code
NioEventLoopGroup initialization process
== New a NioEventLoopGroup
EventLoopGroup workerGroup = new NioEventLoopGroup()
Copy the code
NioEventLoopGroup constructor (NioEventLoopGroup constructor)
public NioEventLoopGroup(a) {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor)null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
}
Copy the code
NThreads: The number of threads to be created. NThreads has the value 0 if the argument constructor is used, and nThreads has the value Executor passed in: Can define your own, if he defines, behind is not initialized, if not defined, the default is null, will be initialized in the rear SelectorProvider: through SelectorProvider. The provider () to create; SelectorProvider is just to create DatagramChannel, Pipe, Selector, ServerSocketChannel, SocketChannel, System. InheritedChannel () selectStrategyFactory: Factory RejectedExecutionHandlers DefaultSelectStrategyFactory INSTANCE, selection strategy. Reject () : A thread pool reject policy that is executed when adding tasks to the thread pool if the pool task is already full
Step 3: = = into MultithreadEventLoopGroup class This is the step of the parent class constructor, Can see selectorProvider selectStrategyFactory, RejectedExecutionHandlers. Reject () in the Object… Args is in the array. One thing this constructor does is use the default DEFAULT_EVENT_LOOP_THREADS if the previous nThreads pass is 0, which is twice the number of CPU cores, or the number of threads passed if the previous nThreads pass is not 0.
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code
Step 4: = = into MultithreadEventExecutorGroup class DefaultEventExecutorChooserFactory INSTANCE, is the event to do factory, Through the new DefaultEventExecutorChooserFactory () object is created
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code
It will then enter the following method. Because this part of the code is quite long, the following source code has been deleted:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
this.terminatedChildren = new AtomicInteger();
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
} else {
if (executor == null) {
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
this.children = new EventExecutor[nThreads];
int j;
for(int i = 0; i < nThreads; ++i) {
boolean success = false;
boolean var18 = false;
try {
var18 = true;
this.children[i] = this.newChild((Executor)executor, args);
success = true;
var18 = false;
} catch (Exception var19) {
throw new IllegalStateException("failed to create a child event loop", var19);
} finally{}this.chooser = chooserFactory.newChooser(this.children); }}Copy the code
Enclosing newDefaultThreadFactory () will create a thread factory, its function is used to create a thread; New ThreadPerTaskExecutor creates a thread executor; this.children = new EventExecutor[nThreads]; Create an array children, nThreads, NioEventLoop; this.children[i] = this.newChild((Executor)executor, args); Create a NioEventLoop through a loop; this.chooser = chooserFactory.newChooser(this.children); Creates a selector for the thread executor
1: newDefaultThreadFactory
protected ThreadFactory newDefaultThreadFactory(a) {
return new DefaultThreadFactory(this.getClass());
}
Copy the code
NewDefaultThreadFactory implements the ThreadFactory interface. When ThreadFactory’s new Thread() is called, a Thread is created and given the name nioEventloop-x-x, The first x indicates which thread group, such as bossGroup or workerGroup, and the second X indicates the serial number of the thread under the current thread group.
2: ThreadPerTaskExecutor
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
} else {
this.threadFactory = threadFactory; }}public void execute(Runnable command) {
this.threadFactory.newThread(command).start();
}
Copy the code
ThreadPerTaskExecutor implements the Executor interface. When its execute method is called, a thread is created and started, and only one thread is created for each NioEventLoop.
3: Create one NioEventLoop after another
private final EventExecutor[] children;
private final EventExecutorChooser chooser;
this.children = new EventExecutor[nThreads];
int j;
for(int i = 0; i < nThreads; ++i) {
boolean success = false;
boolean var18 = false;
try {
var18 = true;
this.children[i] = this.newChild((Executor)executor, args);
success = true;
var18 = false;
} catch (Exception var19) {
throw new IllegalStateException("failed to create a child event loop", var19);
} finally{}this.chooser = chooserFactory.newChooser(this.children);
}
Copy the code
Create an EventExecutor array whose size is the nThreads value passed in. Inside the array are nioEventLoops. Initialize each NioEventLoop in the array:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
}
Copy the code
NewChild (NioEventLoopGroup) returns NioEventLoop (NioEventLoop constructor);
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
} else if (strategy == null) {
throw new NullPointerException("selectStrategy");
} else {
this.provider = selectorProvider;
NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
this.selectStrategy = strategy; }}Copy the code
SelectorTuple. Selector creates a netty modified selector, whereby the multiplexing selector calls its parent class again, tracing it to see:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
Copy the code
In SingleThreadEventExecutor again:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.threadLock = new Semaphore(0);
this.shutdownHooks = new LinkedHashSet();
this.state = 1;
this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = (Executor)ObjectUtil.checkNotNull(executor, "executor");
this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
Copy the code
Create two queues: tailTasks and taskQueue. The taskQueue is the queue that holds the tasks and the tailTasks are the queue that holds the end work
4: ChooserFactory. NewChooser (enclosing the children) thread actuator selector, there is a group of NioEventLoop in NioEventLoopGroup, namely has a set of threads, when a client connect to come over, need I/O operations on it, The selector of the thread executor does just that: polling. When a client comes, it fetches the first thread in the thread group. When another client comes, it fetches the second thread in the thread group. For each client channel, first obtain the value of the counter, and then use the value of the counter to take the module of the array, and then add the counter one, when the number of threads is 2 integer power, Netty will use the bit operation to take the module operation; When the number of threads is not an integer power of 2, Netty uses modulo to calculate the number.
Take a look at the overall class diagram: Step summary: 1: first go to the constructor, determine the number of threads, the default is not specified number of CPU cores *2 3: build thread executor, initialize thread executor 4: create one EventLoop thread 5: create Selector Selector 6: create taskQueue queue
NioEventLoopGroup startup process
NioEventLoopGroup start, actually is NioEventLoopGroup threads in start, namely NioEventLoop start, NioEventLoop start there are two ways: NioEventLoop is triggered when the server is started, and when the client is connected, NioEventLoop is also triggered. The following uses the server as an example.
public ChannelFuture register(Channel channel) {
return this.next().register(channel);
}
Copy the code
Next is a method of the Chooser that returns a NioEventLoop and then calls the registration method to continue tracking the code:
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
Copy the code
eventLoop.execute(new Runnable() {
public void run(a) {
AbstractUnsafe.this.register0(promise); }});Copy the code
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
} else {
// Check whether the current thread is an EventLoop thread
boolean inEventLoop = this.inEventLoop();
// Add tasks to the queue
this.addTask(task);
if(! inEventLoop) {// Start the thread
this.startThread();
if (this.isShutdown()) {
boolean reject = false;
try {
if (this.removeTask(task)) {
reject = true; }}catch (UnsupportedOperationException var5) {
}
if(reject) { reject(); }}}if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop); }}}Copy the code
When you start the server, you start the main thread, so you go to the following method
private void startThread(a) {
if (this.state == 1 && STATE_UPDATER.compareAndSet(this.1.2)) {
try {
this.doStartThread();
} catch (Throwable var2) {
STATE_UPDATER.set(this.1); PlatformDependent.throwException(var2); }}}Copy the code
private void doStartThread(a) {
assert this.thread == null;
this.executor.execute(new Runnable() {
public void run(a) {
SingleThreadEventExecutor.this.thread = Thread.currentThread();
if (SingleThreadEventExecutor.this.interrupted) {
SingleThreadEventExecutor.this.thread.interrupt();
}
boolean success = false;
SingleThreadEventExecutor.this.updateLastExecutionTime();
boolean var112 = false;
int oldState;
label1907: {
try {
var112 = true;
/ / start NioEventLoop
SingleThreadEventExecutor.this.run();
success = true;
var112 = false;
break label1907;
} catch (Throwable var119) {
} finally{}}}); }Copy the code
Execute requests to execute the task. AddTask adds the task to the task queue. 3 Check whether the EventLoop is called
Initialization of a channel
Take a channel on the server as an example: the channel type is NioServerSocketChannel
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(handler); }});Copy the code
Trace AbstractBootstrap next to a channel method:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
} else {
return this.channelFactory((io.netty.channel.ChannelFactory)(newReflectiveChannelFactory(channelClass))); }}Copy the code
A ReflectiveChannelFactory is created and assigned to the channelFactory, which is the factory used to produce channels.
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
} else if (this.channelFactory ! =null) {
throw new IllegalStateException("channelFactory set already");
} else {
this.channelFactory = channelFactory;
return this.self(); }}Copy the code
Here is the source code traced to the server binding port: it mainly initializes the channel, as well as the binding of the port
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = this.initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! =null) {
return regFuture;
} else if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) {
promise.setFailure(cause);
} else{ promise.registered(); AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code
Look at the initAndRegister method: this is used to initialize a channel and register a channel with a selector
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
// instantiate channel
channel = this.channelFactory.newChannel();
// Call the initialization method
this.init(channel);
} catch (Throwable var3) {
if(channel ! =null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
// Call the register method to register a channel with the selector
ChannelFuture regFuture = this.config().group().register(channel);
if(regFuture.cause() ! =null) {
if (channel.isRegistered()) {
channel.close();
} else{ channel.unsafe().closeForcibly(); }}return regFuture;
}
Copy the code
Entering newChannel, constructive.newinstance () is called again
public T newChannel(a) {
try {
return (Channel)this.constructor.newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.constructor.getDeclaringClass(), var2); }}Copy the code
With this call, the NioServerSocketChannel’s no-argument constructor is called:
public NioServerSocketChannel(a) {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
Copy the code
NewSocket is then called:
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2); }}Copy the code
Finally trace all the way to the parent class method:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// Set to non-blocking mode
ch.configureBlocking(false);
} catch (IOException var7) {
try {
ch.close();
} catch (IOException var6) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", var6); }}throw new ChannelException("Failed to enter non-blocking mode.", var7); }}Copy the code
The code above looks familiar, registering the event and setting it to non-blocking mode