takeaway
Original article, reproduced please indicate the source.
Netty-source-code-analysis netty-source-code-analysis
Netty version 4.1.6.Final: Annotated Netty source code used in this article
EventLoop plays a driving role in Netty. In this article, we take NioEventLoopGroup and NioEventLoop as examples to analyze the creation of EventLoopGroup and EventLoop, some important data structures and some optimization of Netty.
1 NioEventLoopGroup
Let’s take NioEventLoopGroup as an example for analysis. NioEventLoopGroup has many constructors, so instead of Posting them all, we will post only two key constructors.
NioEventLoopGroup( int nThreads, Executor executor, Final SelectorProvider SelectorProvider) the constructor SelectStrategyFactory default values are given for DefaultSelectStrategyFactory. The INSTANCE. EventLoop calls the calculateStrategy method of the class for each loop to determine the loop’s strategy, as we’ll see later.
This constructor is eventually called when we create an EventLoopGroup with either new NioEventLoopGroup() or new NioEventLoopGroup(32). Another constructor is called, and we’ll follow.
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
Copy the code
Calls the superclass MultithreadEventLoopGroup constructor here, let’s continue with.
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Copy the code
MultithreadEventLoopGroup constructor as follows, nThreads default value, are also given for MultithreadEventLoopGroup DEFAULT_EVENT_LOOP_THREADS static attributes. Then call the superclass MultithreadEventExecutorGroup constructor.
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
Copy the code
DEFAULT_EVENT_LOOP_THREADS assignment in MultithreadEventLoopGroup static block of code, we see the value for the number of CPU cores x 2 by default.
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
Copy the code
Let’s go with to MultithreadEventExecutorGroup constructor, Here continue to call this constructor MultithreadEventExecutorGroup (int nThreads, Executor Executor, the Object… Args), and assign default values to executor.
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
Copy the code
The execute method creates a thread for each task. The execute method creates a thread for each task. The execute method creates a thread for each task.
public final class ThreadPerTaskExecutor implements Executor { @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }}Copy the code
We continue with the MultithreadEventExecutorGroup (int nThreads, Executor Executor, the Object… The args), Here then call another method to construct MultithreadEventExecutorGroup (int nThreads, Executor Executor, EventExecutorChooserFactory chooserFactory, Object… Args) and assign default values to chooserFacotry, chooserFactory This is used to select EventLoop from the EventLoopGroup in a polling manner when binding an EventLoop to a Channel, which will not be described here.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code
Then look at the next constructor MultithreadEventExecutorGroup (int nThreads, Executor Executor, EventExecutorChooserFactory chooserFactory, Object… Args), this is the core logic for creating an EventLoopGroup.
We create an EventExecutor array and assign it to the Children property. The length of the data is the same as nThreads.
The newChild method is then iterated to assign each element of the array.
The last call chooserFactory. NewChooser (children) for the chooser assignment.
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
} finally {
}
}
chooser = chooserFactory.newChooser(children);
}
Copy the code
Let’s take a look at the newChild method, which is abstract, where the newChild method is implemented in the NioEventLoopGroup. Okay, so here we have the constructor for the NioEventLoop, so follow along.
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code
2 NioEventLoop
Look at the constructor of NioEventLoop, where NioEventLoop holds three attributes.
provider
:selector
Factory class, fromprovider
You can getselector
.selector
: callprovider.openSelector()
To get theselector
So, netty did some optimization here, which we’ll talk about later.selectStrategy
: thecalculateStrategy
The return value of the method is determinedEventLoop
We will also talk about the next step in the future.
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
Copy the code
Continuing with the constructor of the parent class SingleThreadEventLoop, this initializes a property tailTasks, which we’ll talk about in a moment, but remember that there is a task queue called tailTasks.
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
Copy the code
Continue to the parent class SingleThreadEventExecutor constructor, here are a few attribute assignment.
addTaskWakesUp
: This is used to wake up blocking intaskQueue
On theEventLoop
Threaded, inNioEventLoop
In theEventLoop
Does not clogtaskQueue
It’s not very useful here, so I don’t have to study it.maxPendingTasks
: The maximum length of the task queue.executor
: the class that actually generates threads. The default isThreadPerTaskExecutor
.taskQueue
:Task queue, remember we already have one up theretailTask
Is there a queue? There’s another queue.rejectedExecutionHandler
: Rejection policy.
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
Copy the code
Continue to the parent class AbstractScheduledEventExecutor constructor, nothing has been done here, and continue to call the superclass constructor, but we noticed that the class has a queue scheduledTaskQueue, just as its name implies is timing task queue, The queue is not initialized in the constructor until a scheduled task is added.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
}
Copy the code
Continue to the Subclass AbstractEventExecutor constructor, which assigns a value to parent, the EventLoopGroup to which the EventLoop belongs.
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
Copy the code
3 Some netty optimizations
3.1 Optimization of selector
Let’s go back to the openSelector method in the NioEventLoop class. What’s this method doing? It’s replacing the two properties selectedKeys and publicSelectedKeys of the Sun.nio.ch.selectorImpl class with reflection, Replace both properties with instances of the SelectedSelectionKeySet class. Why do we do that? Let’s move on.
private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { } final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (ClassNotFoundException e) { } catch (SecurityException e) { } } }); final Class<? > selectorImplClass = (Class<? >) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); return null; } catch (NoSuchFieldException e) { } catch (IllegalAccessException e) { } catch (RuntimeException e) { } } }); If (maybeException Instanceof Exception) {} else {// Save selectedKeySet to selectedKeys property selectedKeys = selectedKeySet; } return selector; }Copy the code
SelectedKeys and publicSelectedKeys in the sun.nio.ch.selectorImpl class are used to store selectionkeys for events of interest. UngrowableSet (this.selectedKeys); ungrowableSet(this.selectedKeys); ungrowableSet(this.selectedKeys); But you can delete elements. That is, the JDK uses selectedKeys (note that it is protected) internally for adding and removing operations, whereas publicSelectedKeys exposed to the user can only be removed (see selectedKeys).
The default implementation uses HashSet,
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
return this.publicSelectedKeys;
}
}
}
Copy the code
Let’s look at SelectedSelectionKeySet, the data structure we’re using here is an array, and obviously netty is optimized because arrays add and iterate faster than hashSets. Some of you have raised questions about the existence of two arrays, so you don’t have to worry about that, because I don’t understand why there are two arrays, and I have to use them interchangeably. Netty was optimized for a single array in later versions. So all you need to know is that Netty uses arrays for optimization.
So the question is, why do we use HashSet instead of an array or a List in the JDK, because every call to the selector select method puts the SelectionKey that we already have in selectedKeys, If the user does not process the Channel’s events or delete the elements in the selectedKeys after the first call to the select method, the same SelectionKey is added to the selectedKeys after the next call to the select method. SelectedKeys using an array or List implementation will not have the same effect.
On the other hand, if you use lists or arrays, the cost of deletion is also high.
In netty, selectedKeys will not be deleted between two consecutive calls to the select method. In addition, Netty directly exposes the selectedKeys, and can directly set the corresponding index element in the data to NULL during deletion.
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
}
Copy the code
3.2 Optimization of task queue
In the constructor of SingleThreadEventExecutor taskQueue properties by calling newTaskQueue method, was covered in NioEventLoop newTaskQueue method, we have a look.
Here is MpscQueue by optimization, full name is MultiProducerSingleConsumerQueue, is more producers single consumers queue, anyone interested in to see, now that do a separate queue, Certainly better than BlockingQueue in the current scenario.
Why is this possible? Obviously, there’s only one queue consumer that’s an EventLoop, and there’s multiple producers. And, here is a comment line This event loop never calls takeTask (), which means the MultiProducerSingleConsumerQueue and without blocking take method, NioEventLoop also doesn’t need to call the blocking take method, which works just fine.
@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return PlatformDependent.newMpscQueue(maxPendingTasks); }Copy the code
Similarly, the tailTask queue is optimized to an MpscQueue. We have a look at what is scheduledTaskQueue queue, the answer in AbstractScheduledEventExecutor# scheduledTaskQueue method, We see that the scheduledTaskQueue is just a normal priority queue, not even a thread-safe blocking queue.
Queue<ScheduledFutureTask<? >> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<? > > (); } return scheduledTaskQueue; }Copy the code
Why? Why tailTasks and TaskQueues are mpscQueues and ScheduledTaskQueues are non-thread-safe queues? Probably because there is no MpsQueue with priority.
ScheduledTaskQueue is a non-thread-safe queue. Is there a problem with multithreading safety? No. We see AbstractScheduledEventExecutor# schedule (io.net ty. Util. Concurrent. ScheduledFutureTask < V >) method, in here to add timing task, If the call is made by a non-EventLoop thread, an asynchronous call is made and the EventLoop thread adds the scheduled task to the scheduledTaskQueue. So, there’s another optimization here, and that’s lazy initialization of the scheduledTaskQueue.
4 summarizes
Draw the key point, this paper focuses on these two.
- each
EventLoop
There are three queues intailTasks
,taskQueue
andscheduledTaskQueue
. andtailTasks
andtaskQueue
Queue inNioEventLoop
Is optimized asMultiProducerSingleConsumerQueue
. - Netty to
selector
In theselectedKeys
Optimized fromHashSet
Replace withSelectedSelectionKeySet
.SelectedSelectionKeySet
It’s an arraySet
, adding elements and traversal are more efficient.
About the author
Wang Jianxin, senior Java engineer of Zhuan Architecture Department, mainly responsible for service governance, RPC framework, distributed call tracking, monitoring system, etc. Love technology, love learning, welcome to contact and exchange.