Writing in the front
I suggest that those who have not read my previous articles can read my previous series of articles for further understanding.
Here’s a quick review. The last article introduced the first two steps of the NettyServer server:
Part 1 New NioEventLoopGroup(nThreads) Part 2 initializes the chained assignment operation of the assignment logic new ServerBootstrap()
NioEventLoopGroup contains the core properties taskQueue, Selector, and Executor
ServerBootstrap Core properties
EventLoopGroup ServerBootstrap. ChildGroup (workerGroup event receive processing) EventLoopGroup AbstractBootstrap. Group (bossGroup event creation) ChannelFactory AbstractBootstrap. ChannelFactory (channel reflection factory, Finally used to instantiate C) Map, Object > AbstractBootstrap. Options (the Bootstrap configuration items) ChannelHandler ServerBootstrap. ChildHandler (channel processor, Initialize our custom processor link)
This article will dive into the last step bootstrap.bind(inetPort).sync(); Explore Netty’s deepest mysteries
Third the bootstrap. Bind (inetPort). The sync ();
Bind (int inetPort); bind(int inetPort);
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
Copy the code
The link is finally called to the trunk doBind method
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! =null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registrati
1. List item
on was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code
The core method doBind
We focus on two core methods in the doBind method
- initAndRegister();
- doBind0()
InitAndRegister () initializes and registers
So let’s do these two methods one at a time, initAndRegister
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if(channel ! =null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
if (channel.isRegistered()) {
channel.close();
} else{ channel.unsafe().closeForcibly(); }}return regFuture;
}
Copy the code
Throw away the exception handling logic and go straight to the trunk. We only need to care about three lines of code at the core
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
Copy the code
1. Channel channel = channelFactory.newChannel();
This line of code initializes the ReflectiveChannelFactory when we initialize the ServerBootstrap assignment
.channel(NioServerSocketChannel.class)
Copy the code
The channelFactory here. NewChannel () operation is equivalent to invoke the NioServerSocketChannel no arguments constructor
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
@Override
public T newChannel(a) {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class "+ constructor.getDeclaringClass(), t); }}Copy the code
This no-parameter constructor has a long super call link. Before delving into this link, let’s look at newSocket(DEFAULT_SELECTOR_PROVIDER).
public NioServerSocketChannel(a) {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
Copy the code
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e); }}Copy the code
This is the encapsulation of Nio code, equivalent to serverSocketChannel.open () which returns a ServerSocketChannel if you’re interested you can review this article BIO, Nio Introduction (Netty Guide)
So Netty is a further encapsulation of Nio.
Go back toNioServerSocketChannel
Take a look at the inheritance relationshipThe constructor goes through the following abstract superclasses and finally completes initialization
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2); }}throw new ChannelException("Failed to enter non-blocking mode.", e); }}protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
Copy the code
So we’re going to look at it from the bottom up in the order that the code is executed first of all protected AbstractChannel(Channel parent)
unsafe = newUnsafe();
Copy the code
Here newUnsafe () through the breakpoint is AbstractNioMessageChannel method can see call
AbstractNioMessageChannel
@Override
protected AbstractNioUnsafe newUnsafe(a) {
return new NioMessageUnsafe();
}
Copy the code
Initialize pipline Pipeline = newChannelPipeline();
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected DefaultChannelPipeline newChannelPipeline(a) {
return new DefaultChannelPipeline(this);
}
Copy the code
Here DefaultChannelPipeline builds the pipline in the form of a linked list and initializes the tail and head
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Copy the code
Finally, let’s look at the initialization process as a whole
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2); }}throw new ChannelException("Failed to enter non-blocking mode.", e); }}Copy the code
Here will first SelectionKey. OP_ACCEPT and before newSocket (DEFAULT_SELECTOR_PROVIDER) – > provider. OpenServerSocketChannel () to create ServerSocketChannel is assigned to a member variable of AbstractNioChannel, and non-blocking ch.configureBlocking(false) is configured;
AbstractNioChannel
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
Copy the code
2. init(channel);
Note that the init method of the incoming channel is channelFactory newChannel (); Initialized NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
Copy the code
Part two init(channel); ServerBootstrap is the server side, so obviously we want to look at ServerBootstrap
The init method is as follows
@Override
void init(Channel channel) throws Exception {
finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
setChannelOptions(channel, options, logger);
}
finalMap<AttributeKey<? >, Object> attrs = attrs0();synchronized (attrs) {
for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions;finalEntry<AttributeKey<? >, Object>[] currentChildAttrs;synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
Through options0 (); Get the option Map setChannelOptions that we set during initialization
finalMap<ChannelOption<? >, Object> options = options0();synchronized (options) {
setChannelOptions(channel, options, logger);
}
Copy the code
The NioServerSocketChannel pipline is added here
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code
ChannelInitializer is added to DefaultChannelPipeline in NioServerSocketChannel to initialize the private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter this is ServerBootstrapAcceptor inner classes.
I’m just going to make up a little bit of what I did before, just to reinforce that
So in summary init is basically initializationNioServerSocketChannel
Options and pipeline initialization.
3.ChannelFuture regFuture = config().group().register(channel);
So we’re essentially calling group.register(channel), and that group is the same bossGroup that we initialized, which is NioEventLoopGroup
So the realization of the register is in its parent class MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
Copy the code
Write in the back
Read the next space, already enough long, can’t write again, write again smelly and long, so decided to divide up and down to go. With that said, I’ll follow up on config().group().register(channel) and the second part, doBind0(). I hope you found this helpful.
I am aDying strandedI am always looking forward to meeting you. Whether you expect it or not, tides come and go, I’m just here…
See you next time