Hi everyone, I am Yigui. Thank you for your thumbs up, collection and comments. I am constantly updating the article and I will see you next time. You can also add my personal VX to communicate: LHJ502819. We will try our best to impact the big factory together. In addition, we have a lot of learning materials to provide to you.
This column is interpreted based on 4.1.73.Final, Github: github.com/lhj502819/n… The example code is in the example module
series
- Do you know what the I/O models are?
- Java NIO three roles Channel, Buffer, Selector
- Scalable IO in Java, translated by Doug Lea
- Reactor model do you know what they are?
- Netty server create source code process analysis
- What exactly is an EventLoopGroup?
- To be continued..
In the last article, we reviewed the process of creating a Netty server. We have seen many components in Netty. Today, we will take a look at one of them, EventLoopGroup.
In Netty, Channel is the abstract class of network operation, and EventLoop is responsible for processing the Channel processing I/O operation registered on it. An EventLoopGroup is a grouping of Eventloops that can retrieve one or more EventLoop objects, so it provides a method for iterating out EventLoop objects.
In our usage example, we use:
- New NioEventLoopGroup(), creates an EventLoopGroup object
- EventLoopGroup# Register (Channel Channel) to register a Channel with the EventLoopGroup
The sections in red are the EventLoopGroup classes, which we will only cover today
EventExecutorGroup
The EventExecutorGroup is responsible for providing EventExecutor for use with its Next () method. In addition, it takes care of their life cycle and allows them to be turned off globally. It inherits ScheduledExecutorService and Iterable
, indicating that it can be timed, and provides EventExecutor storage and traversal. The EventExecutorGroup does not perform tasks itself, but instead assigns tasks submit or schedule to groups of EventExecutors it manages. As for which EventExecutor to submit, An EventExecutor is typically selected through the next() method, but only the behavior is provided, depending on the implementation class.
AbstractEventExecutorGroup
{@link #submit(Runnable)} provides a way to submit a task, but the implementation is implemented by subclasses. Code will not paste, interested in you look at the line ha, very simple.
MultithreadEventExecutorGroup
As the name suggests, it is a multithreaded version of EventExecutor.
variable
/** * EventExecutor collection */
private final EventExecutor[] children;
/** * Read-only EventExecutor array */
private final Set<EventExecutor> readonlyChildren;
/** * Number of terminated executors */
private final AtomicInteger terminatedChildren = new AtomicInteger();
/** * used to terminate the asynchronous Future of EventExecutor */
private finalPromise<? > terminationFuture =new DefaultPromise(GlobalEventExecutor.INSTANCE);
/** * EventExecutor selector */
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
Copy the code
A constructor
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link#newChild(Executor, Object...) } call */
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link#newChild(Executor, Object...) } call */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link#newChild(Executor, Object...) } call */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
/** * If the executor passed is empty, the default executor ThreadPerTaskExecutor */ is used
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
Create an EventExecutor array
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// Create an Executor object
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if(! success) {// If the creation fails, all created EventExecutor is closed
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// Make sure all EventExecutor is closed
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// Create EventExecutor selector
chooser = chooserFactory.newChooser(children);
// Create a listener to listen for EventExecutor termination,
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// Callback logic to notify listeners when all EventExecutor terminates by calling the Promise#setSuccess method
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null); }}};for (EventExecutor e: children) {
// Set listeners to each EventExecutor
e.terminationFuture().addListener(terminationListener);
}
// Create an immutable EventExecutor array
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
Copy the code
Note (here we focus on the final constructor) :
- EventExecutorChooserFactory default is DefaultEventExecutorChooserFactory to choose an Executor in multiple Executor to perform a task
- If we do not specify an executor, ThreadPerTaskExecutor will be used (mentioned below)
Line # 49
Creating an Executor creates an Executor for each thread, which is then accessed through the next() method, and is implemented by subclasses
ThreadPerTaskExecutor
An executor implementation class of one thread per task
EventLoopGroup
EventExecutorGroup, a successor to EventExecutorGroup that provides the ability to register channels.
MultithreadEventLoopGroup
Inherited MultithreadEventExecutorGroup, and realizes the EventLoopGroup, based on MultithreadEventExecutorGroup abstract the registration of the Channel behavior.
static {
/** * When the number of threads is not specified, the default number of threads is CPU core * 2, because the current CPU is hyperthreaded, one CPU can have two threads */
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); }}@Override
public EventLoop next(a) {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
// Get an EventLoop and register the Channel
return next().register(channel);
}
Copy the code
NioEventLoopGroup
Executor implementation based on Java NIO Selector, NioEventLoop instances are created here, Selector is bound to EventLoop here, and JDK bugs using epoll are addressed.
/** * We'll just focus on the arguments to its constructor, and here we see the Selector we're looking for@param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if default one should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param selectorProvider the {@link SelectorProvider} to use.
* @param selectStrategyFactory the {@link SelectStrategyFactory} to use.
* @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use.
* @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#execute(Runnable)},
* or {@code null} if default one should be used.
* @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)},
* or {@code null} if default one should be used.
*/
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectorProvider selectorProvider,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
/** * Create a NioEventLoop to execute the event, where the args is passed from the constructor to the parent class, which in turn calls the subclass implementation
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
/**
* Replaces the current {@link Selector}s of the child event loops with newly created {@linkSelector}s to work * around the infamous epoll 100%CPU bug. NioEventLoop automatically calls this method to rebuild the Selector object to fix the problem */
public void rebuildSelectors(a) {
for (EventExecutor e: this) { ((NioEventLoop) e).rebuildSelector(); }}/**
* Replaces the current {@link Selector} of this event loop with newly created {@linkSelector}s to work * around the infamous poll 100% CPU bug
public void rebuildSelector(a) {
if(! inEventLoop()) { execute(new Runnable() {
@Override
public void run(a) { rebuildSelector0(); }});return;
}
rebuildSelector0();
}
Copy the code
conclusion
EventLoopGroup is the encapsulation of Executor and the implementation of JUC based package. The authors split each working component in a very detailed way for scalability. In the article I did not COPY all the source code out sentence by sentence translation, we do not have to read the source code in the process to understand every line of code, we want to experience the ingenious framework design, to grasp the internal logic of the whole framework.
Stay tuned for an analysis of EventLoop in the next article.