preface
NIO and Netty server startup have been explained in front of xiaofei, this lecture is the Client startup process.
Source series of articles still follow the plain + drawing style to explain, this article Netty source code and later article versions are based on: 4.1.22.Final
This article is to start NettyClient as a starting point, take you step by step into the world of Netty source code.
The Client startup process is revealed
1, explore the entrance: Netty-client Demo
Here’s an example using EchoClient from Netty-ExMaple:
public final class EchoClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(newEchoClientHandler()); }}); ChannelFuture f = b.connect(HOST, PORT).sync(); f.channel().closeFuture().sync(); }finally{ group.shutdownGracefully(); }}}Copy the code
There’s nothing special about the code, and we’ve gone over some of the conventions of Netty network programming in our last article, so I won’t go over them here. (For those who forget, check out the Netty series.)
The above client code is simple, but it shows everything you need to initialize a Netty client:
EventLoopGroup
:Netty
The server or client must be specifiedEventLoopGroup
, the client specifiesNioEventLoopGroup
Bootstrap
:Netty
The client startup class, which is responsible for the client startup and initialization processchannel()
Type: specifiedChannel
Because this is the client, so is usedNioSocketChannel
Is used by the serverNioServerSocketChannel
Handler
: Sets the processor for the databootstrap.connect()
: Client connectionnetty
Methods of service
2. NioEventLoopGroup process parsing
NioEventLoopGroup: NioEventLoopGroup: NioEventLoopGroup: NioEventLoopGroup: NioEventLoopGroup
EventLoop inherits from the EventLoopGroup, so you can guess what the class structure is like. Here some core logic in MultithreadEventExecutorGroup, contain EventLoopGroup creation and initialization operation, etc.
Start with the NioEventLoopGroup constructor and work your way down (the code will only show the important parts, leaving out much of the code that you don’t need to worry about for the moment. The following code follows this principle) :
EventLoopGroup group = new NioEventLoopGroup();
public NioEventLoopGroup(a) {
this(0);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
Copy the code
Here by calling this () and super () method to pass all the way, will construct during some default properties, have been passed to the MultithreadEventExecutorGroup class, then to the west.
2.1, MultithreadEventExecutorGroup
The constructor above has one important parameter passed: DEFAULT_EVENT_LOOP_THREADS, which defaults to the number of CPU cores * 2.
Why pass this parameter? Before we said EventLoopGroup can understand into a thread pool, MultithreadEventExecutorGroup have a thread array EventExecutor [] children attribute, The DEFAULT_EVENT_LOOP_THREADS passed in is the length of the array.
First look at the constructor of the MultithreadEventExecutorGroup:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
/ /... omit
}
Copy the code
This code execution logic can be understood as:
- through
ThreadPerTaskExecutor
To construct aExecutor
Actuators, which we’ll talk about later, contain things that threads executeexecute()
methods - And I’m gonna create a
EventExecutor
Array object of the size passed inthreads
Quantity, this so-calledEventExecutor
It can be interpreted as oursEventLoop
In this demoNioEventLoop
object - The last call
newChild
Methods are initialized one by oneEventLoopGroup
In theEventLoop
object
Under the above only roughly said MultithreadEventExecutorGroup constructor do, in the later will also in detail one by one, don’t try so hard, the first we’ll have a whole cognition.
Back to MultithreadEventExecutorGroup constructor into the participation of a EventExecutorChooserFactory object, there is a very bright eye of detail design, through which we understand the good intention of Netty.
2.1, window design: DefaultEventExecutorChooserFactory
EventExecutorChooserFactory the function of this class is used to select EventLoop actuators, we know a EventLoopGroup is contains the number of CPU * 2 EventLoop array object, So which of the arrays is selected each time you select an EventLoop to perform the task?
Let’s take a look at the implementation of this class, highlighted in red:
DefaultEventExecutorChooserFactory is a selector factory class, call the inside of the next () method to achieve the goal of a polling option.
The length of the array is length, and for the NTH time, whatever element in the array is mod length
I’m going to use val & -val == val. I’m not going to go into too much detail here. There are a lot of good ways to find out if the length of the array is 2 to the n. I won’t teach you how to suck eggs. (Please refer to leetcode-cn.com/problems/po…)
Of course, I think there’s another algorithm here that’s easier to understand: x & (x-1) == 0 and you can see it in the picture below, which is not extended here:
BUT!!! Why bother saying that the length of the array is 2 to the NTH power?
Do you still remember the HashMap by Daming Lake? Hash n is the length of the array. If the length of the array is 2 to the NTH power, the following formula is equivalent:
n & (length – 1) <=> n % length
Remember that the default array size is CPU * 2, and the average server CPU core number is 2, 4, 8, 16, etc., so this small optimization is very useful.
The nice thing about bitwise is that it’s much more efficient than and, and Netty has optimized for this little detail, which is great.
2.3. ThreadPerTaskExecutor
Next, look at the ThreadPerTaskExecutor thread executor, which creates a thread entity each time a task is executed.
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) { threadFactory.newThread(command).start(); }}Copy the code
The threadFactory passed in is called DefaultThreadFactory, which will construct the NioEventLoop thread named nioEventloop-1-xxx, but we won’t go into that. When the thread executes, the execute() method is called, which creates a FastThreadLocalThread thread.
public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}
protected Thread newThread(Runnable r, String name) {
return newFastThreadLocalThread(threadGroup, r, name); }}Copy the code
This creates a Thread with newThread() and initializes the Thread object data, which will eventually be called in Thread.init().
2.4. Initialize EventLoop
Then continue to read MultithreadEventExecutorGroup constructor:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
/ /... Omit some code}}Copy the code
The last part of the code above is the newChild method, which is an abstract method whose job is to instantiate an EventLoop object. If we trace the code, we can see that this method is implemented in the NioEventLoopGroup class and it is very simple:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
Copy the code
You instantiate a NioEventLoop object and return it. The NioEventLoop constructor holds the provider and event poller selector, creates an MpscQueue in its parent class, and then saves the thread executor.
And if you think about it again, Internal maintains a MultithreadEventExecutorGroup EventExecutor [] array of children, The realization mechanism of Netty EventLoopGroup are based on MultithreadEventExecutorGroup.
Whenever Netty needs an EventLoop, the next() method is called to get an available EventLoop object from the EventLoopGroup array. The next method is implemented through nioeventLoopgroup.next (), which is calculated by polling algorithm as explained above.
To summarize the entire EventLoopGroup initialization process:
EventLoopGroup
(in fact isMultithreadEventExecutorGroup
) internally maintains a type ofEventExecutor children
The array, the array length isnThreads
- If we’re instantiating
NioEventLoopGroup
, if the thread pool size is specifiednThreads
Is the specified value, and vice versaNumber of processor cores x 2
MultithreadEventExecutorGroup
Will callnewChild
Abstract method to initializechildren
An array of- Abstract methods
newChild
Is in theNioEventLoopGroup
Which returns oneNioEventLoop
Instance. NioEventLoop
Properties:-
SelectorProvider provider properties: NioEventLoopGroup constructor by SelectorProvider. The provider () to obtain a SelectorProvider
-
The Selector property: Gets a Selector object in the NioEventLoop constructor by calling Selector = provider.openSelector().
-
2.5, NioSocketChannel
In Netty, a Channel is an abstraction of a Socket. Each time Netty establishes a connection, there is an instance of a Channel corresponding to it.
Niosocketchannel.class = nioSocketChannel.class = NioSocketChannel;
Then analysis code, and when we call biggest hannel (when) will actually enters AbstractBootstrap. Channel () logic, then look at the code in the AbstractBootstrap:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory ! =null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
Copy the code
As you can see, ReflectiveChannelFactory returns the channelClass we specified: NioSocketChannel, and then specify channelFactory in AbstractBootstrap = new ReflectiveChannelFactory().
2.6 Channel initialization process
Now that we know the flow of NioEventLoopGroup and channel(), let’s look at the channel initialization process, which is one of the core processes that Netty clients start:
ChannelFuture f = b.connect(HOST, PORT).sync();
Copy the code
Then we will start from the entrance of b. connection () step by step to see the overall flow of NioSocketChannel construction:
From Connet, the overall process is summarized as follows:
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister(a) {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
Copy the code
In order to make it easier to read, the code has been simplified and only some important code has been preserved.
Then we see channelFactory. NewChannel what he did (), here is channelFactory ReflectiveChannelFactory, we were analyzed in the above chapters:
@Override
public T newChannel(a) {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class "+ clazz, t); }}Copy the code
The clazz here is NioSocketChannel, again described in the previous section, which calls the constructor of NioSocketChannel and initializes a Channel instance.
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(a) {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e); }}}Copy the code
NIO SocketChannel: SocketChannel: NIO SocketChannel: SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel: NIO SocketChannel
NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
ch.configureBlocking(false); }}Copy the code
This will call the parent constructor argument and pass readInterestOp = selectionkey.op_read:, and one more important thing is to configure Java NIO SocketChannel to be non-blocking, as we explained earlier in the NIO section, I will not repeat it here.
Continuing with AbstractChannel’s constructor:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }}Copy the code
Here, we create a ChannelId, and we create an Unsafe object, which is not Unsafe in Java, and we’ll talk about that later. Then create a ChannelPipeline, which will be covered later. At this point, a complete NioSocketChannel is initialized.
Netty
的SocketChannel
With theJava
The nativeSocketChannel
Bind together;- registers
Read
Events; - Will be for every one
Channel
Assign achannelId
; - Will be for every one
Channel
To create aUnsafe
Object; - Will be for every one
Channel
Assign aChannelPipeline
;
2.7 Channel registration process
Back at the top of the initAndRegister method, we are analyzing the operation of newChannel, which is a process created by NioSocketChannel, and we are continuing with init() and register() :
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B.C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister(a) { Channel channel = channelFactory.newChannel(); init(channel); ChannelFuture regFuture = config().group().register(channel); }}Copy the code
Init () sets the options and attrs parameters to a channel. The most important thing to look at is the register method, which calls in the following chain:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
Finally here to register the unsafe () method, the final call to AbstractNioChannel doRegister () :
@Override
protected void doRegister(a) throws Exception {
boolean selected = false;
for (;;) {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
return; }}Copy the code
JavaChannel () is a SocketChannel in Java NIO, where a SocketChannel is registered with the selector associated with an eventLoop.
Finally, let’s sort out the overall process of service startup:
initAndRegister()
What to initialize and register?
channelFactory.newChannel()
- Create one by reflection
NioSocketChannel
- will
Java
nativeChannel
Bound to theNettyChannel
中 - registered
Read
The event - for
Channel
distributionid
- for
Channel
createunsafe
object - for
Channel
createChannelPipeline
(the default ishead<=>tail
Bidirectional linked list)
- `init(channel)“
- the
Bootstrap
Set the configuration toChannel
中
register(channel)
- the
Channel
Bind to aEventLoop
上 - the
Java
nativeThe Channel, Netty
的The Channel, the Selector
Bound to theSelectionKey
中 - The trigger
Register
Related events
2.8 Unsafe Initialization
An Unsafe object is created during the initialization of a Channel and bound to it:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
Copy the code
NewUnsafe directly calls the NioSocketChannel method:
@Override
protected AbstractNioUnsafe newUnsafe(a) {
return new NioSocketChannelUnsafe();
}
Copy the code
NioSocketChannelUnsafe is an inner class in NioSocketChannel, which is then inherited from several parent classes, in this case the underlying Socket operations of the relevant Java.
2.9 Pipeline initialization
Back to the pipeline initialization process, let’s look at the implementation of newChannelPipeline() :
protected DefaultChannelPipeline newChannelPipeline(a) {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Copy the code
We call the DefaultChannelPipeline constructor and pass in a channel, which is actually our instantiated NioSocketChannel.
DefaultChannelPipeline stores the NioSocketChannel object in the channel field. DefaultChannelPipeline also has two special fields, head and tail, which are the head and tail of a two-way list. Actually in DefaultChannelPipeline, maintains a AbstractChannelHandlerContext as nodes of two-way linked list, this list is a key to the realization of Pipeline mechanism of Netty.
The bidirectional linked list in DefaultChannelPipeline and what it does will be explained in detail in a later chapter. Here is just a preliminary understanding of pipeline.
The inheritance hierarchy of HeadContext is as follows:
The inheritance hierarchy of TailContext is as follows:
We can see that head is a ChannelOutboundHandler and tail is a ChannelInboundHandler.
3.0 client Connect process
Bootstrap.connect() is the entry method of the client connection, which is also analyzed above. The specific process of the request is as follows:
Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run(a) throws IOException {
returnsocketChannel.connect(remoteAddress); }}); }catch (PrivilegedActionException e) {
throw(IOException) e.getCause(); }}Copy the code
As you can see here, the client connection request is still made using a Connect request sent by a Java NIO SocketChannel.
conclusion
This article uses a Netty Client demo as an entry point, and then analyzes the process of creating NioEventLoopGroup, creating and registering a Channel, and the specific process of the Client initiating connect. There are not many details here. These will be placed in the subsequent source analysis article, please look forward to ~