Look at the source
The source address
public static void main(String[] args) throws Exception {
// Configure SSL.
/ / configure SSL
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);// Create a boss thread group for the server to accept client connections
EventLoopGroup workerGroup = new NioEventLoopGroup();// Create worker thread groups for SocketChannel data reads and writes
// Create EchoServerHandler object
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
// Set the EventLoopGroup to use
b.group(bossGroup, workerGroup)
// Get the corresponding Channel factory
.channel(NioServerSocketChannel.class)
// set the NioServerSocketChannel option
.option(ChannelOption.SO_BACKLOG, 100)
// set the NioServerSocketChannel handler
.handler(new LoggingHandler(LogLevel.INFO))
// Set optional subchannels
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(NioChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// Set the handler for the SocketChannel of the Client connected to the server
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if(sslCtx ! =null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler); }});// Where to start the service
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}Copy the code
Source code analysis
configuration
ServerBootstrap is a helper class that makes better use of channels. The inheritance relationship of ServerBootstrap is as followsThere is a lot of configuration above, nothing good to say, is to assign values to each member variable, here to help you more intuitive look, respectively, is to assign values to the following member variables, in the face of source code parsing, it is easier to see a little bit.
ServerBootstrap{
/ / bossGroup (AbstractBootstrap)
volatile EventLoopGroup group;
//workerGroup
private volatile EventLoopGroup childGroup;
/ / NioServerSocketChannel. Class encapsulated into ReflectiveChannelFactory
private volatile ChannelFactory<? extends C> channelFactory;
AbstractBootstrap (AbstractBootstrap)
private finalMap<ChannelOption<? >, Object> options =newLinkedHashMap<ChannelOption<? >, Object>();// AbstractBootstrap (AbstractBootstrap)
private volatile ChannelHandler handler;
// Add childOption to the map
private finalMap<ChannelOption<? >, Object> childOptions =newLinkedHashMap<ChannelOption<? >, Object>();//childHandler
private volatile ChannelHandler childHandler;
}
Copy the code
bind()
This method is used to create channels and configure them. Into the AbstractBootstrap: : doBind method, can see the rough
- create
Channel
Is in theinitAndRegister
Or lower level methods, just parsinginitAndRegister
Can be io.netty.channel.Channel
The interior must be maintainedjava.nio.channels.Channel
Because there are operations such as binding.ChannelFuture
Can listen for the event to complete and trigger a callback
private ChannelFuture doBind(final SocketAddress localAddress) {
// Initialize and register a Channel object. Since registration is an asynchronous process, return a ChannelFuture object.
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! =null) {
return regFuture;
}
// Because it is asynchronous, there is no guarantee of completion
// Bind the Channel port and register the Channel in the selectionKey
if (regFuture.isDone()) {
// Registration completed
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// The registration has not been completed
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) {
// Registration failed on EventLoop, so once we try to access Channel's EventLoop, we will simply fail ChannelPromise to avoid IllegalStateException.
promise.setFailure(cause);
} else {
// Register successfully, so set the correct actuator to use.
// See https://github.com/netty/netty/issues/2586promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code
Create and initialize a channel
initAndRegister
First let’s look at the initAndRegister method. The preliminary conclusion is as follows.
- A Channel is created by reflection, and the channelFactory is passed in above
NioServerSocketChannel.class
- Init completes the initialization of a channel
ChannelFuture regFuture = config().group().register(channel);
Looks likeserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
(Unclear read my last article) similar
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
// Create a channel object
channel = channelFactory.newChannel();
// Initialize the channel configuration
init(channel);
} catch (Throwable t) {
if(channel ! =null) {
// If newChannel crashes, channel can be null
channel.unsafe().closeForcibly();
// Since the channel is not registered, we need to force GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// Since the channel is not registered, we need to force GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// Get the initialized bossGroup and bind the channel to
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
if (channel.isRegistered()) {
channel.close();
} else{ channel.unsafe().closeForcibly(); }}/** * If this promise doesn't fail, it must be for the following reasons * 1. If we try to register from the event loop, the registration is now complete. Since channel registration is complete, use bind(),connect () is safe * 2. If we try to register from another thread, Then the registration request has been successfully added to the task queue of the event loop for later execution * since bind(),connect () will be executed after the scheduled task * should register(), bind(), and connect() be bound in the same thread */
return regFuture;
}
Copy the code
NioServerSocketChannel
See here can see, io.net ty. Channel. The channel is the Java. Nio. Channels. The channel of a layer of encapsulation, proved the above conjecture
NioServerSocketChannelConfig is the io.net ty. Channel. The channel and ServerSocket classes
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e); }}private final ServerSocketChannelConfig config;
/** * Create a new instance */
public NioServerSocketChannel(a) {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket); }}}Copy the code
init
It is important to initialize a channel and then add ChannelInitializer to your pipeline, which in turn adds ServerBootstrapAcceptor to your pipeline
void init(Channel channel) {
// Put the previously configured options into the Channel's ChannelConfig
setChannelOptions(channel, newOptionsArray(), logger);
// Put the attr into the Channel's ChannelConfig
setAttributes(channel, newAttributesArray());
// Get the pipeline created during channel initialization
ChannelPipeline p = channel.pipeline();
// Used to initialize socketChannel
/ / the corresponding WorkerGroup
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
// Attribute is converted to this
finalEntry<ChannelOption<? >, Object>[] currentChildOptions = newOptionsArray(childOptions);finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = newAttributesArray(childAttrs);/ * * * in the pipeline in DefaultChannelHandlerContext * ChannelInitializer one-time initialization handler; The ServerBootstrapAcceptor Handler is responsible for the initialization of a client connection after it has been created
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
// Wait for execution
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
config().group().register(channel);
Config ().group() This is the bossGroup configured earlier. I’m going to focus on the register method
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
Copy the code
In AbstractChannel. AbstractUnsafe: : in the register back to asynchronous execution register0 this method, the core in this method. PS: All the methods in the Unsafe class are very core.
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// omit useless code
// Channel eventLoop binds to the current eventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch (Throwable t) {
// omit useless code}}}Copy the code
register0
This method is so important that it is worth mentioning separately
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Execute the ChannelInitializer that was added to the pipeline
// Make sure we call handlerAdded(...) before we actually notify the commitment. .
// This is required because the user may have already triggered the event through a pipe in ChannelFutureListener
pipeline.invokeHandlerAddedIfNeeded();
// Check whether you want to print the log
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only channelActive is triggered if the channel has never been registered.
This prevents multiple channel activities from being triggered if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was previously registered with autoRead() set.
// This means we need to start reading again so that we can process the inbound data.
// See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
// Close the channel directly to avoid FD leak.closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code
doRegister
Perform AbstractNioChannel: : doRegister method, we can see that is right EventLoopGroup is a layer of encapsulation of the Selector, get SelectorKey to Channel maintenance. Ops is 0, but the ops enumerations for SelectorKey are
- public static final int OP_READ = 1 << 0;
- public static final int OP_WRITE = 1 << 2;
- public static final int OP_CONNECT = 1 << 3;
- public static final int OP_ACCEPT = 1 << 4;
This should be no event registered. It’s going to echo the rest of the place.
protected void doRegister(a) throws Exception {
boolean selected = false;
for (;;) {
try {
// Register a channel with a selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
return;
} catch (CancelledKeyException e) {
if(! selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throwe; }}}}Copy the code
invokeHandlerAddedIfNeeded
. This method will eventually call ChannelHandler handlerAdded eventually call outer (ServerBootstrap) pipeline add ChannelInitializer initChannel execution method.
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
// Wait for execution
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }});Copy the code
safeSetSuccess
This method performs ChannelFutureListener outer added, namely AbstractBootstrap: : doBind method
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) {
// Registration failed on EventLoop, so once we try to access Channel's EventLoop, we will simply fail ChannelPromise to avoid IllegalStateException.
promise.setFailure(cause);
} else {
// Register successfully, so set the correct actuator to use.
// See https://github.com/netty/netty/issues/2586promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});Copy the code
Binding port
Finally, we go back to the AbstractChannel::bind method
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// omit useless code
// Whether the Channel is active
boolean wasActive = isActive();
try {
// Bind channel ports
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// After binding, the dish starts to activate
if(! wasActive && isActive()) { invokeLater(new Runnable() {
@Override
public void run(a) { pipeline.fireChannelActive(); }}); } safeSetSuccess(promise); }Copy the code
JavaChannel () is the ServerSocketChannel that gets the channel wrapper
@SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}Copy the code
Binding end port, pipeline. FireChannelActive (); Finally, the doBeginRead() method is invoked to register the actual listening event for the channel.
protected void doBeginRead(a) throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if(! selectionKey.isValid()) {return;
}
readPending = true;
/ / 0
final int interestOps = selectionKey.interestOps();
// Whether to listen to readInterestOp, listen to readInterestOp
if ((interestOps & readInterestOp) == 0) {
//OP_ACCEPT 1<<4 16selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code
The Netty server is successfully started.
conclusion
-
Io.net ty. Channel. The channel is the Java nio. Channels. The channel of a layer of encapsulation
-
Io.net ty.channel. channel also maintains SelectionKey internally
-
The EventLoopGroup is a layer of encapsulation of the Selector
-
When initialized, only bossGroup was used to listen for connection events