This column is based on 4.1.73.final, and the source address is github.com/lhj502819/n…
Creating a Netty server is familiar. Here is a simple server example:
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port"."8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
Create two EventLoopGroup objects
EventLoopGroup bossGroup = new NioEventLoopGroup(1);// Create a BOOS thread group for the server to accept client connections
EventLoopGroup workerGroup = new NioEventLoopGroup();// Create a worker thread group for SocketChannel data reading and writing and processing business logic
/ / create a Handler
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// Create ServerBootstrap object
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)/ / set EventLoopGroup
.channel(NioServerSocketChannel.class)// Set the NioServerSocketChannel class to be instantiated
.option(ChannelOption.SO_BACKLOG, 100) // Set the NioServerSocketChannel settable
.handler(new LoggingHandler(LogLevel.INFO))// set the NioServerSocketChannel handler
.childHandler(new ChannelInitializer<SocketChannel>() {// Set the handler that handles the SocketChannel of the connected Client
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if(sslCtx ! =null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler); }});// Start the server.
// Bind ports and wait for the synchronization to succeed, that is, start the server
ChannelFuture f = b.bind(PORT).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("Callback after successful Channel binding test");
}
}).sync();
// Wait until the server socket is closed.
// The listening server is closed and blocked waiting
// This is not a server shutdown, but a "listening" server shutdown
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
// Gracefully close two EventLoopGroup objectsbossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code
ServerBootstrap, NioServerSocketChannel, EventLoop, NioEventLoop, ChannelHandler, ChannelPipeline, Today we are going to focus on ServerBootstrap and how it relates to other classes.
Sequence diagram
The sequence diagram for the entire server is as follows:
In the following figure, we can see the class diagram of the three classes ending with Bootstrap. ServerBootstrap and Bootstrap are the Bootstrap class of the server side and the Bootstrap class of the client side respectively. These two classes are derived from AbstractBootstrap, and most of the methods and responsibilities are the same. So let’s focus on AbstractBootstrap.
AbstractBootstrap main method
//B inherits AbstractBootstrap to represent its own type
//C inherits from the Channel class and represents the created Channel type
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B.C>, C extends Channel> implements Cloneable {... Omit some code.................../** * EventLoopGroup object */
volatile EventLoopGroup group;
/** Channel factory, used to create Channel object */
@SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
/** * local address */
private volatile SocketAddress localAddress;
// The order in which ChannelOptions are applied is important they may depend on each other for validation
// purposes.
/** * Optional set */
private finalMap<ChannelOption<? >, Object> options =newLinkedHashMap<ChannelOption<? >, Object>();/ * * * attribute set, can understand into Java nio. Channels. SelectionKey legislation attribute, and type of Map * /
private finalMap<AttributeKey<? >, Object> attrs =newConcurrentHashMap<AttributeKey<? >, Object>();/** * processor */
private volatile ChannelHandler handler;
AbstractBootstrap() {
// Disallow extending from a different package.
}
AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
group = bootstrap.group;
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
/** * why do you set options to lock? Because options can be modified in another thread, such as the option method {@link #option(ChannelOption, Object)}
*/
synchronized(bootstrap.options) { options.putAll(bootstrap.options); } attrs.putAll(bootstrap.attrs); }... Omit some code................... }Copy the code
group()
// Set EventLoopGroup, called by subclass ServerBootstrap. These EventLoopGroups are used to handle all events and IO for ServerChannel and Channel.
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group ! =null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
Copy the code
channel(Class)
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass"))); }Copy the code
Set the Channel class to be instantiated, encapsulated underneath by the factory class
ChannelFactory
The factory class used to create channels. ReflectiveChannelFactory uses reflection to create channels, as shown below.
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
/** * We need to get the default constructor for the corresponding Channel at initialization to reduce the overhead of creating Channel instances */
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e); }}@Override
public T newChannel(a) {
try {
//Channel's no-argument constructor
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class "+ constructor.getDeclaringClass(), t); }}}Copy the code
option(ChannelOption option, T value)
Optional for setting channels, using Map storage underneath.
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else{ options.put(option, value); }}return self();
}
Copy the code
attr(AttributeKey key, T value)
The property used to set the Channel, like the attachment for SelectionKey.
public <T> B attr(AttributeKey<T> key, T value) {
ObjectUtil.checkNotNull(key, "key");
if (value == null) {
attrs.remove(key);
} else {
attrs.put(key, value);
}
return self();
}
Copy the code
handler(ChannelHandler handler)
Sets the Channel’s handler
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
Copy the code
validate
This method is called by {@link #bind()} to verify that the configuration is correct
/** * Validate all the parameters. Sub-classes may override this, but should * call the super method in that case. * * */
public B validate(a) {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
Copy the code
clone
AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap class AbstractBootstrap
public abstract B clone(a);
Copy the code
config
AbstractBootstrapConfig < ServerBootstrapConfig > < ServerBootstrapConfig > < ServerBootstrapConfig > < ServerBootstrapConfig > < ServerBootstrapConfig > < ServerBootstrapConfig > < ServerBootstrapConfig >
//ServerBootstrap class, you can see that ServerBootstrapConfig combines ServerBootstrap as its own property
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
Copy the code
setChannelOptions
Set the options of the passed Channel as follows, but there is also an option method. The difference between the two methods is that this method sets the options of the created Channel, while the option method sets the options of the uncreated Channel.
static void setChannelOptions( Channel channel, Map.Entry
, Object>[] options, InternalLogger logger)
> {
for (Map.Entry<ChannelOption<?>, Object> e: options) {
setChannelOption(channel, e.getKey(), e.getValue(), logger);
}
}
Copy the code
private static void setChannelOption( Channel channel, ChannelOption
option, Object value, InternalLogger logger) {
try {
if(! channel.config().setOption((ChannelOption<Object>) option, value)) { logger.warn("Unknown channel option '{}' for channel '{}'", option, channel); }}catch (Throwable t) {
logger.warn(
"Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t); }}Copy the code
Process for binding ports and starting servers
AbstractBootstrap and ServerBootstrap are both involved in the whole process. Because part of the process is executed asynchronously, the code has a large jump. Please take a look at it carefully and debug it yourself if you don’t understand
bind
AbstractBootstrap
This method is used to bind ports and start the server
// Bind ports and wait for the synchronization to succeed, that is, start the server
ChannelFuture f = b.bind(PORT).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("Callback after successful Channel binding test");
}
}).sync();
Copy the code
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
// The verification service is started
validate();
// Bind the local address
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
public ChannelFuture bind(a) {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
public ChannelFuture bind(String inetHost, int inetPort) {
return bind(SocketUtils.socketAddress(inetHost, inetPort));
}
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
Copy the code
The return type of this method is ChannelFuture. Java also has a Future for asynchronous callbacks. Netty’s ChannelFuture also has a Future for asynchronous callbacks.
doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
// Initialize and register a Channel object. Since registration is an asynchronous process, return a ChannelFuture
//initAndRegister
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! =null) {
// If an exception occurs, return directly
return regFuture;
}
// The registration is asynchronous, so we need to check whether the registration is successful
if (regFuture.isDone()) {// If the registration is complete, call doBind0, bind the Channel's port, and register the Channel into the SelectionKey.
// Bind the Channel's port and register the Channel in the SelectionKey
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {// If the registration is not complete, call ChannelFuture#addListener to add a listener, listen for the completion of the registration event, for the corresponding processing
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! =null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586promise.registered(); doBind0(regFuture, channel, localAddress, promise); }}});returnpromise; }}Copy the code
initAndRegister
/** * Initializes and registers a Channel object, returning a ChannelFuture object *@return* /
final ChannelFuture initAndRegister(a) {
Channel channel = null;
try {
// Create a Channel object. This ChannelFactory is set by calling the # Channel method, which is used to set the instantiated NioServerSocketChannel class, not the actual instantiation
channel = channelFactory.newChannel();
ServerBootstrap and BootStrap are used to initialize the Channel configuration
init(channel);
} catch (Throwable t) {
/ / exception
if(channel ! =null) {// Close a channel if it has already been created
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// Return the DefaultChannelPromise object with an exception and bind a FailedChannel
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// Register a Channel with the EventLoopGroup. Inside the method, the EventLoopGroup assigns an EventLoop object to the Channel and registers the Channel with it
ChannelFuture regFuture = config().group().register(channel);
if(regFuture.cause() ! =null) {
// If an exception occurs and a Channel is registered successfully, the #close method is called to close the Channel, raising the Channel closure event
if (channel.isRegistered()) {
channel.close();
} else {
// If an exception occurs and no registration is successful, call closeForcibly shut down. This method does not trigger the shutdown event because no registration has been successful, so no other events are triggered
channel.unsafe().closeForcibly();// Force Channel closure}}// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
Copy the code
Creating a Channel object
NioServerSocketChannel
Here we take NioServerSocketChannel as an example. The NioServerSocketChannel class diagram is shown below and will be used later.
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// Same as ServerSocketChannel#open
/** * public static ServerSocketChannel open() throws IOException { * return SelectorProvider.provider().openServerSocketChannel(); *} * /
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e); }}/ * * * Channel corresponding configuration object, each Channel implementation class, also corresponds to a ChannelConfig implementation class, such as the corresponding ServerSocketChannelConfig NioServerSocketChannel * /
private final ServerSocketChannelConfig config;
/** * Create a new instance */
public NioServerSocketChannel(a) {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
// Call the parent constructor, passing in the SelectionKey as OP_ACCEPT, indicating that the event of interest is OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
// Initialize the config property
config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }}Copy the code
#newSocket
The return type of the method isjava.nio.channels.ServerSocketChannel
Indicates that the underlying Layer of Netty Channel is Java SocketChannel- in
41
The constructor of the row calls the parent class’s constructor and passes in the event it is interested in, just like Java’s server-side NIO programming
AbstractNioMessageChannel
// The base class for the operation message
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
/ * * *@see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
@Override
protected void doBeginRead(a) throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead(); }}Copy the code
AbstractNioChannel
/** * is basically encapsulating the Selector */
public abstract class AbstractNioChannel extends AbstractChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
//Netty NIO Channel object, which holds the Java native NIO Channel object
private final SelectableChannel ch;
/ / operating values, interested in reading events AbstractNioMessageChannel is SelectionKey OP_ACCEPT, AbstractNioByteChannel is SelectionKey OP_READ
protected final int readInterestOp;
volatile SelectionKey selectionKey;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// Set the event of interest
this.readInterestOp = readInterestOp;
try {
// Set it to non-blocking
ch.configureBlocking(false);
} catch (IOException e) {
// Close the NIO Channel if an exception occurs
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e); }}}Copy the code
AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
// Parent Channel, null for NioServerSocketChannel parent
private final Channel parent;
/ / the Channel number
private final ChannelId id;
private final Unsafe unsafe;
// Default ChannelPipeline
private final DefaultChannelPipeline pipeline;
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
// Create a Channel IDid = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }}Copy the code
Initialize the Channel configuration
ServerBootstrap and Bootstrap are implemented by themselves. ServerBootstrap is used as an example
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
// Record the current attribute
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
finalEntry<ChannelOption<? >, Object>[] currentChildOptions = newOptionsArray(childOptions);finalEntry<AttributeKey<? >, Object>[] currentChildAttrs = newAttributesArray(childAttrs);// Add the ChannelInitializer object to the pipeline for subsequent initialization and ChannelHandler to the pipeline
// Why use ChannelInitializer for initialization? Not directly added to the pipeline
/ / when the Channel is not registered to the EventLoop, if the call EventLoop (). The execute throws an Exception in the thread "is the main" Java. Lang. An IllegalStateException: Channel not registered to an event loop exception
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// Add the configured ChannelHandler to the pipeline
ChannelHandler handler = config.handler();
if(handler ! =null) {
pipeline.addLast(handler);
}
// Add ServerBootstrapAcceptor to the pipeline
ch.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
pipeline.addLast(newServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); }}); }}); }Copy the code
Register a Channel into the EventLoopGroup
To register, you need to call the EventLoopGroup#register method. AbstractUnsafe is the parent class of NioMessageUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// Check that EventLoop is not empty
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// Verify that Channel and EventLoop match, as they both have many implementation types
// Request NioEventLoop
if(! isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// Set the Channel's EventLoop property
AbstractChannel.this.eventLoop = eventLoop;
// Execute the registration logic in EventLoop
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run(a) { register0(promise); }}); }catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
// Force Channel closure
closeForcibly();
// Notify CloseFuture that it has been closed
closeFuture.setClosed();
// The callback notifies PROMISE that the exception occurredsafeSetFailure(promise, t); }}}Copy the code
register0
//AbstractChannel.java
/** * Register logic */
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if(! promise.setUncancellable() || ! ensureOpen(promise)) {// Make sure Channel is open
return;
}
// Is used to record whether it is the first registration
boolean firstRegistration = neverRegistered;
// Perform the registration logic
doRegister();
// The mark has not been registered for the first time
neverRegistered = false;
// The Channel flag is already registered
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// Trigger ChannelInitializer to initialize the Handler
pipeline.invokeHandlerAddedIfNeeded();
// The callback promise executes successfully, ChannelFutureListener registered by regFuture in the doBind method
safeSetSuccess(promise);
// Triggers notification of registered events
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805beginRead(); }}}catch (Throwable t) {
// Close the channel directly to avoid FD leak.
// The same logic as the register() method when an exception occurscloseForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}Copy the code
DoRegister, register the Selector
//AbstractNioChannel.java
/** * Register NChannel with Selector of EventLoop *@throws Exception
*/
@Override
protected void doRegister(a) throws Exception {
boolean selected = false;
for (;;) {
try {
//unwrappedSelector gets the selector on the eventLoop, each of which has a selector
// Why is the event of interest set to 0? The second edition of Netty's Definitive Guide explains:
(1) The registration method is polymorphic. It can be used by NioServerSocketChannel to listen for client access, and can also be registered by SocketChannel to listen for network read/write operations
The interestOps(int OPS) method of SelectionKey can easily change the time of interest, so the registration needs to get the SelectionKey and assign a value to the AbstractNIOChannel member variable SelectionKey
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0.this);
return;
} catch (CancelledKeyException e) {
if(! selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throwe; }}}}Copy the code
doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// Execute Channel bound port logic in Channel EventLoop
channel.eventLoop().execute(new Runnable() {
@Override
public void run(a) {
if (regFuture.isSuccess()) {
// The registration succeeds, and the port is bound
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
// The registration fails, and the callback notifies the promise that it failedpromise.setFailure(regFuture.cause()); }}}); }Copy the code
bind
Channel# bind will eventually go into io.net ty. Channel. AbstractChannel. AbstractUnsafe# bind,
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
If the thread is in the same thread as the thread to which the EventLoop object is bound
assertEventLoop();
if(! promise.setUncancellable() || ! ensureOpen(promise)) {return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceofInetSocketAddress && ! ((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && ! PlatformDependent.isWindows() && ! PlatformDependent.maybeSuperUser()) {// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// Records whether the Channel is active
boolean wasActive = isActive();
try {
// Bind the Channel port, the underlying will call the Java native Channel for binding, NioServerSocketChannel implementation
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// If a Channel is active, the Channel activated event is triggered, which normally returns true,
if(! wasActive && isActive()) {// Submit the task and execute it asynchronously
invokeLater(new Runnable() {
@Override
public void run(a) { pipeline.fireChannelActive(); }}); }// The callback notifies the Promise of success to the ChannelFutureListener added earlier, on the create server side
/**
* ChannelFuture f = b.bind(PORT).addListener(new ChannelFutureListener() {
* @Override* Public void operationComplete(ChannelFuture Future) throws Exception {* system.out.println (" Callback after Channel binding is successful "); * } * }).sync(); * /
safeSetSuccess(promise);
}
Copy the code
io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// Call the Java native ServerSocketChannel binding IP + port
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
Copy the code
private void invokeLater(Runnable task) {
try {
// This method is used by outbound operation implementations to trigger an inbound event later.
// They do not trigger an inbound event immediately because an outbound operation might have been
// triggered by another inbound event handler method. If fired immediately, the call stack
// will look like this for example:
//
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
// -> handlerA.ctx.close()
// -> channel.unsafe.close()
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
//
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
eventLoop().execute(task);
} catch (RejectedExecutionException e) {
logger.warn("Can't invoke task later as EventLoop rejected it", e); }}Copy the code
beginRead
io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
Io.net ty. Channel. AbstractChannel. AbstractUnsafe# if binding ports in the bind of success, will trigger channel activation event, this event will perform here, execute the following process
//AbstractNioMessageChannel
protected void doBeginRead(a) throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
Copy the code
//AbstractNioChannel
protected void doBeginRead(a) throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
// Get the current Channel binding SelectionKey
final SelectionKey selectionKey = this.selectionKey;
if(! selectionKey.isValid()) {return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// When a Channel is registered with an EventLoop (doRegister), the event of interest for the Channel is set to 0
//0 & any number is 0. ReadInterestOp is initialized with OP_ACCEPT, so it's OP_ACCEPT on the events it's interested in
if ((interestOps & readInterestOp) == 0) {
// Set the time of interest to OP_ACCEPT as set during NioServerSocket initialization, and the server can start processing client connection requests
selectionKey.interestOps(interestOps | readInterestOp);
}
Copy the code
conclusion
This article we first familiar with the Netty server to create the process, understand the whole process involved in the components, next we will carry on the in-depth study of each component, interested you can pay attention to me, will continue to update, thank you for your browsing and like.