Netty source code analysis series
- Netty source code parsing series – server start process parsing
- Netty source code parsing series – client connection access and read I/O parsing
- 5 minutes to understand pipeline model -Netty source code parsing
1. Server startup example (based on 4.0.31.final)
public class Server {
private ServerBootstrap serverBootstrap;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
public static void main(String[] args) throws InterruptedException {
System.out.println("Service startup");
Server server = new Server();
server.start();
}
private void start() throws InterruptedException {
try {
serverBootstrap=new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup(4);
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new InitHandler())
.childHandler(new IOChannelInitialize());
ChannelFuture future = serverBootstrap.bind(8802).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel"); ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0)); ch.pipeline().addLast(new IOHandler()); }}}Copy the code
Step-by-step instructions
-
1.1 Creating ServerBootstrap Instance, a netty bootstrap helper class that provides a series of methods for setting server boot-related parameters. The bottom layer abstracts and encapsulates various capabilities through the facade mode, so that users do not need to deal with too many bottom API as far as possible, reducing the difficulty of user development
-
1.2 NioEventLoopGroup is the Netty Reactor thread pool. BossGroup monitors the connection between the ACCEPT client and workGroup processes the I/O, codec
-
1.3 binding server NioServerSocketChannel
-
1.4 Setting Some Parameters
-
1.5 Initialize pipeline and bind handler. A pipeline is a chain of responsibilities responsible for handling network events, managing and executing ChannelHandler, and setting up the IdleStateHandler and custom IOHandler provided by the system
-
1.6 serverBootstrap.bind(8802) This is where the server binding port is started
-
1.7 future. Channel (.) closeFuture (). The sync (); Wait for the server to shut down
-
1.8 Graceful Closing
2. Source code analysis
2.1 NioEventLoopGroup
NioEventLoopGroup is not only an I/O thread, but also responsible for system tasks and scheduled tasks in addition to I/O reading and writing
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
Copy the code
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
Copy the code
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
Copy the code
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
Copy the code
Moving on, here’s the simplified code
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
...
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for(int i = 0; i < nThreads; i ++) { ... children[i] = newChild(threadFactory, args); . }Copy the code
MultithreadEventExecutorGroup realized thread thread creation and choice, we see the newChild method (NioEventLoopGroup class method), newChild threads instantiated
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
Copy the code
A NioEventLoop is created
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
Copy the code
Along with the super
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
Copy the code
Code has been streamlined. Go ahead
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() { SingleThreadEventExecutor.this.run(); }}});Copy the code
Instantiate a thread here, and in the run call SingleThreadEventExecutor run method, where the thread is started, we continue to look down to summarize: NioEventLoopGroup is actually a Reactor thread pool, which schedules and executes client access, network read and write events, user-defined tasks, and scheduled tasks.
2.2 ServerBootstrap
ServerBootstrap is the server startup helper class. The parent class is AbstractBootstrap, and the corresponding client startup helper class is Bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress; private final Map<ChannelOption<? >, Object> options = new LinkedHashMap<ChannelOption<? >, Object>(); private final Map<AttributeKey<? >, Object> attrs = new LinkedHashMap<AttributeKey<? >, Object>(); private volatile ChannelHandler handler; }Copy the code
2.2.1 Setting the BooSS and Work thread Pools
BossGroup is passed to the parent class, and workGroup is assigned to serverBootstrap’s childGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if(this.childGroup ! = null) { throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
Copy the code
2.2.2 setting NioServerSocketChannel to process connection requests
serverBootstrap.channel(NioServerSocketChannel.class)
Copy the code
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
Copy the code
Continue with new BootstrapChannelFactory
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class "+ clazz, t); }}}Copy the code
BootstrapChannelFactory is an inner class that inherits ChannelFactory. As the name suggests, it is a channel factory class that overrides the parent’s newChannel method. Create an instance of NioServerSocketChannel with reflection, and it’ll tell you where it was called, right
2.2.3 Setting the value of channel Channel block
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
Copy the code
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if(value == null) { synchronized (options) { options.remove(option); }}else{ synchronized (options) { options.put(option, value); }}return (B) this;
}
Copy the code
The option method here is the parent class AbstractBootstrap, the method of the options is an orderly not thread safe two-way linked list, add lock
2.2.4 serverBootstrap.childOption
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if(value == null) { synchronized (childOptions) { childOptions.remove(childOption); }}else{ synchronized (childOptions) { childOptions.put(childOption, value); }}return this;
}
Copy the code
ServerBootstrap childOption is a method subclass serverBootstrap. Set ServerChannel’s options childOption: The main set of options for ServerChannel’s subchannels is option for the boss thread and childOption for the Work thread pool
2.2.5 setting the server NioServerSocketChannel Handler
serverBootstrap.handler(new InitHandler())
Copy the code
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
Copy the code
2.2.6 serverBootstrap. ChildHandler ()
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
Copy the code
Handler belongs to the server NioServerSocketChannel, it’s only created once. ChildHandler belongs to every new NioSocketChannel, This is called whenever a connection comes up
2.2.7 The actual startup process is performed here, let’s look at the bind() method
serverBootstrap.bind(8802).sync()
Copy the code
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
Copy the code
- (1)Create one by port numberInetSocketAddressAnd continue tobind
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
Copy the code
- (2) validate()Method to perform some parameter verification, we directly look atdoBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if(regFuture.cause() ! = null) {return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if(cause ! = null) { promise.setFailure(cause); }else {
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise); }});returnpromise; }}Copy the code
- (3.1)See firstinitAndRegister ( AbstractBootstrapClass), leaving out some unimportant ones
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
init(channel);
ChannelFuture regFuture = group().register(channel);
return regFuture;
}
Copy the code
ChannelFactory is created at serverBootstrap.channel(), where reflection is called to create an instance of NioServerSocketChannel
- (3.2.1)Take a look atinit(channel)Methods (ServerBootstrapClass)
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<? >, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<? >, Object> attrs = attrs(); synchronized (attrs) {for(Entry<AttributeKey<? >, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<? >, Object>[] currentChildOptions; final Entry<AttributeKey<? >, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler();if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
Copy the code
Options () is an AbstractBootstrap class assigned by ServerBootstrap.option (). P.addlast () adds a new handler to NioServerSocketChannel, where pipeline is like a Servlet filter, Managing all handlers
- (3.2.2)Take a look atgroup().register()methods
Here,group 是 BossGroup (NioEventLoopGroup – spending MultithreadEventLoopGroup), multiple jumps toSingleThreadEventLoopOf the classregister()methods
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
Copy the code
- (3.2.3) Clear out some unimportant code, below is the real registration
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() { register0(promise); }}); } catch (Throwable t) { } } }Copy the code
Eventloop.ineventloop () is used to check whether the starting thread and the current thread are the same. If they are the same, they are started. If they are different, they are not started or the thread is different.
- (3.2.4)The thread hasn’t started yet. Let’s goeventLoop.execute()theexecute()Method isSingleThreadEventExecutorOf the class
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if(isShutdown() && removeTask(task)) { reject(); }}if(! addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); }}Copy the code
- (3.2.5) Start thread
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if(STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { thread.start(); }}}Copy the code
We in the beginning of 2.1 SingleThreadEventExecutor thread within the constructor is start here, we go back to 2.1
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() { SingleThreadEventExecutor.this.run(); }}});Copy the code
- (3.2.6)Open theSingleThreadEventExecutor.this.run() ;
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
Copy the code
This is done asynchronously, polling the SELECT client for Accept, and runAllTasks for all tasks
- (3.3)We’ll see(3.1)The inside of theChannelFuture regFuture = group().register(channel);Jump toSingleThreadEventLoop 的 registermethods
@Override
public ChannelFuture register(Channel channel) {
...
channel.unsafe().register(this, promise);
return promise;
}
Copy the code
Here is the simplified code (AbstractUnsafe inner class in the AbstractChannel class)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
eventLoop.execute(new OneTimeTask() {
@Override
public void run() { register0(promise); }}); . }Copy the code
private void register0(ChannelPromise promise) {
...
doRegister(); .if(firstRegistration && isActive()) { pipeline.fireChannelActive(); }... }Copy the code
Continue (in AbstractNioChannel class)
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for(;;) {... selectionKey = javaChannel().register(eventLoop().selector, 0, this); . }}Copy the code
Register NioServerSocketChannel with the Selector of the boss thread pool NioEventLoop. Here we should register OP_ACCEPT(16) to register 0 on the multiplexer: (1) The registration method is polymorphic, which can be used by NioServerSocketChannel to listen for client connection access. The interestOps(int OPS) method of the SelectionKey allows you to easily modify the interestOps bit
Look at pipeline. FireChannelActive ()
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
Copy the code
@Override
public Channel read() {
pipeline.read();
return this;
}
Copy the code
@Override
public ChannelPipeline read() {
tail.read();
return this;
}
Copy the code
@Override
public ChannelHandlerContext read() {... next.invokeRead(); . }Copy the code
private void invokeRead() { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code
Go into the read of HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
Copy the code
@Override public final void beginRead() { … doBeginRead(); . }
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if(! selectionKey.isValid()) {return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code
Finally, change the selectionKey’s listening bit to OP_READ here
- (4)Take a look atdoBind0( )methods
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else{ promise.setFailure(regFuture.cause()); }}}); }Copy the code
The method is put into the REACTOR thread pool task queue, which determines whether the registration is successful, and then the bind method is continued
- (5)performbind( )methods
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
Copy the code
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
Copy the code
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { ... final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); . next.invokeBind(localAddress, promise); . }Copy the code
Since the bind event is an outbound event, look for an outbound handler and execute the invokeBind() method
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); }}Copy the code
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
Copy the code
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
doBind(localAddress); . }Copy the code
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
Copy the code
ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap: ServerBootstrap The reactor thread pool, the server Channel, and some configuration parameters are configured. The client is connected to the handler. Register OP_ACCEPT to the multiplexer (3) start reactor thread pool, listen for connections continuously, process task (4) bind port