Writing in the front
Before we start this article, let’s review what we did in the last one
In the first part, we took a brief look at the core class files in the Netty source code. Categories include:
Facade – The initiator is represented by AbstractBootstrap, the transport pipeline is represented by ChannelPipeline, the Netty custom thread pool is represented by EventLoop, and the context is represented by AbstractChannel AbstractChannelHandlerContext represented
Netty main process bootstrap.bind(port)
public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(0);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(newMyNettyServerHandler()); }}); ChannelFuture cf = bootstrap.bind(9000).sync();
cf.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
Here I post the Netty architecture diagram, along with the sample template code. In fact, Netty’s main flow we only need to focus on three lines of core code:
new NioEventLoopGroup(1);
new ServerBootstrap();
bootstrap.bind(9000).sync();
Copy the code
All the others are corner assignment code, of which the most central is:
bootstrap.bind(9000).sync();
Copy the code
This article will revolve around the above three lines of code
Part 1 New NioEventLoopGroup(nThreads)
When we’re writing a Netty program, of course we have to declare bossGroup and workerGroup so first of all what does new NioEventLoopGroup(nThreads) do for us
Its inheritance structure is as follows:
NioEventLoopGroup constructor:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Copy the code
After a series of overloading, the last call to: the superclass constructor MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code
Continue up MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
Copy the code
Finally the real execution logic in MultithreadEventExecutorGroup class
Create a new instance.
Params: NThreads — The number of threads that will be used by this instance. Executor — The executor to use, Or null if the default should be 2. ChooserFactory – the EventExecutorChooserFactory to use. The args – the arguments which will passed to each newChild(Executor, Object…) call
Create a new instance.
Parameter: nThreads – The number of threads this instance will use. Executor – The executor to use, or null if the default should be used. ChooserFactory – The event executor chooserFactory to use. Args – will be passed to each newChild (Executor, Object…) Parameters of the call
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if(! success) {for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null); }}};for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
Copy the code
Let’s focus on the initialization code for children:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
children = new EventExecutor[nThreads];
children[i] = newChild(executor, args);
}
Copy the code
children[i] = newChild(executor, args);
So obviously the newChild method here is callingNioEventLoopGroup
The implementation of the
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
Copy the code
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
finalSelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }}Copy the code
Call to SingleThreadEventExecutor super will eventually
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
Copy the code
Here’s one line of code we need to pay special attention to: taskQueue
private finalQueue<Runnable> taskQueue; ... taskQueue = newTaskQueue(this.maxPendingTasks);
Copy the code
Finally, I created a LinkedBlockingQueue
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
Copy the code
And Selecter initialization code:
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
Copy the code
According to the above diagram and the source of MultithreadEventExecutorGroup. EventExecutor [] children initialization is NioEventLoop finallyThe inheritance structure is as followsLet’s go back to improving the architecture diagram step by step:As an asynchronous, high-performance framework, Netty internally uses a lot of threads to do things. We have seen that bossGroup and workGroup contain many different thread groups. From now on, the children in these two groups are instances of NioEventLoop. Each NioEventLoop contains a SingleThreadEvenetExecutor thread instance.
Notice that when we create the bossGroup, we usually pass the value 1 which is new NioEventLoopGroup(1); So bossGroup is typically handled by a single NioEventLoopGroup thread
The second part initializes the assignment logic
Let’s see what the second code does to ServerBootstrap. This is a chain programming process, and the whole thing is to make an initial assignment to our new ServerBootstrap() instance.
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(newMyNettyServerHandler()); }});Copy the code
Group (bossGroup, workerGroup) Netty calls parentGroup and childGroup That is, bossGroup = parentGroup and workerGroup = childGroup. ChildGroup assigns a value directly to this.childGroup, and parentGroup calls the constructor super.group(parentGroup);
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup ! =null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
Copy the code
Code super group (parentGroup); AbstractBootstrap = volatile EventLoopGroup group; The initialization assignment is done, and the comment is clear:
The {@link EventLoopGroup} which is used to handle all The events for The to-be-created This EventLoopGroup is used to handle all The events for The to-be-created
public abstract class AbstractBootstrap
volatile EventLoopGroup group;
/**
* The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
* {@link Channel}
*/
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group ! =null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
Copy the code
Here our logical bossGroup and workGroup correspond to ServerBootstrap member group and childGroup
Methods. The channel (NioServerSocketChannel. Class)
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
Copy the code
Class ServerBootstrap extends AbstractBootstrap
channelFactory; It instantiates a new ReflectiveChannelFactory
(channelClass) which we pass in as a class. It’s actually a very simple ReflectiveChannelFactory. Let’s just look at the two methods at the core of it.
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e); }}@Override
public T newChannel(a) {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class "+ constructor.getDeclaringClass(), t); }}Copy the code
An assignment constructor this. Constructor = clazz.getconstructor (); Return constructor. NewInstance (); We initialize the constructor, then behind some of the time will be its instantiation, namely calls this. ChannelFactory. NewChannel () here we will probably have a impression.
Here I believe that everyone has some sense, this is a chain of programming for the member variable assignment, not in to ServerBootstrap member variable assignment, is in its abstract parent class member variable assignment AbstractBootstrap nothing more, nothing is difficult, should be very easy to understand.
Option (channeloption.so_backlog, 1024) : Map
, Object> options to configure our ServerBootstrap. The reserved configurations are given in the ChannelOption class.
public abstract class AbstractBootstrap
private final Map<ChannelOption<? >,Object> options = newLinkedHashMap<ChannelOption<? >, Object>();/**
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
* created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
*/
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized(options) { options.remove(option); }}else {
synchronized(options) { options.put(option, value); }}return self();
}
Copy the code
Finally, initializing our custom channelPipeline link is basically fixed. Finally, the variable childHandler is assigned
/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
Copy the code
Finally, a summary of the key variable assignments performed:
EventLoopGroup ServerBootstrap. ChildGroup (workerGroup event receive processing) EventLoopGroup AbstractBootstrap. Group (bossGroup event creation) ChannelFactory AbstractBootstrap. ChannelFactory (channel reflection factory, Finally used to instantiate C) Map, Object > AbstractBootstrap. Options (the Bootstrap configuration items) ChannelHandler ServerBootstrap. ChildHandler (channel processor, Initialize our custom processor link)
The last
This section walks you through the first two parts of the Netty server source code NioEventLoopGroup initialization process, and the second part of ServerBootstrap key attribute assignment, and introduces its function. ChannelFuture cf = bootstrap.bind(9000).sync(); Bind Specifies the procedure for binding a port. Finally hope this article is helpful to you, we Netty (3) be there or be square.
I am aDying strandedI am always looking forward to meeting you. Whether you expect it or not, tides rise and fall, I’m just here …………