1. What is Reactor?

A Reactor Pattern is an event processing pattern used to process service requests that are delivered to the server simultaneously through one or more inputs. The service handler reuses incoming requests and dispatches them synchronously to the associated handler. Key points:

(1) Event driven

(2) Processing multiple inputs

(3) Use multiplexing to distribute the event to the corresponding Handler for processing

2. Reactor main components

(1) Reactor

Responsible for responding to events that bind event distribution to Handler handling of the event. Nioeventloop.run (),processSelectedKeys() for Netty.

(2) Handler

Event handler, bound to a certain type of event, responsible for the execution of the corresponding event task to process the event. Corresponding to netty IdleStateHandler.

(3) Acceptor

Acceptors are a special class of handlers that, in isolation, are reactor’s event receiver class that initializes the selector and receives the buffer queue. ServerBootstrapAcceptor corresponding to netty.

Second, the process

1. Create a mainReactor thread pool and a subReactor thread pool

	bossGroup = new NioEventLoopGroup();
	workGroup = new NioEventLoopGroup(4);
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { children = new SingleThreadEventExecutor[nThreads]; .for(int i = 0; i < nThreads; i ++) { ... children[i] = newChild(threadFactory, args); . }}Copy the code
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
Here the mainReactor and subReactor thread pools are created, and the eventLoop thread is created

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    provider = selectorProvider;
    selector = openSelector();
Each eventLoop thread has its own selector. The eventLoop thread has not been started yet, and when it is started, it will perform the selector in Run ().

2. The mainReactor binds selector to OP_ACCEPT and loops selector. Select ();

ChannelFuture regFuture = group().register(channel); The group() here is the bossGroup

public ChannelFuture register(Channel channel) {
    return next().register(channel);
Next ()

public EventLoop next() {
    return (EventLoop) super.next();
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    public EventExecutor next() {
Retrieve the first eventLoop from the thread pool

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    channel.unsafe().register(this, promise);
    return promise;
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
    } else {
    try {
        eventLoop.execute(new OneTimeTask() {
Here the mainReactor’s eventLoop is bound to the server’s NioServerSocketChannel. Since the main thread was initially started, eventloop. execute was executed, where mainReactor only started one thread.

public void execute(Runnable task) {
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
Start the mainReactor thread loop by executing startThread() in execute and adding the Task register0(Promise) to the taskQueue to execute the mainReactor thread loop.

private void register0(ChannelPromise promise) {
    neverRegistered = false;
    registered = true;
protected void doRegister() throws Exception {
    boolean selected = false;
Register the selector for eventLoop in the mainReactor with an action listening bit of 0. Bind the server socketchannel to the main Reactor thread in doBind()–>doBind0 –>channel.bind()–>… – > next. InvokeBind () – > HeadContext. The Bind () – > unsafe. The Bind () – > pipeline. FireChannelActive () – > channel. The read () – >… –>doBeginRead() changed to OP_ACCEPT(16) operation listening bit.

protected void doBeginRead() throws Exception {... final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
3. SubReactor registers the OP_READ event

After receiving a client connection, the client Channel will be registered with the subReactor thread in the ServerBootstrapAcceptor and bound to the Selector of the subReactor thread. Listen for the OP_READ event on the client channel

if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); }Copy the code

When listening on a client connection, execute read() on the server AbstractNioUnsafe;

public void read() {... intlocalRead = doReadMessages(readBuf); .for (int i = 0; i < size; i ++) {
(1) doReadMessage

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
Set client channel listening bit to OP_READ(1)

(2) pipeline.fireChannelRead()

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
   final Channel child = (Channel) msg;

   for(Entry<ChannelOption<? >, Object> e: childOptions) { try {if(! child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e);
       } catch (Throwable t) {
           logger.warn("Failed to set a channel option: "+ child, t); }}for(Entry<AttributeKey<? >, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(newChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
The ServerBootstrapAcceptor not only binds the subReactor to the client channel, but also initializes some parameters for the client channel

public ChannelFuture register(Channel channel) {
    return next().register(channel);
Same as register above, but change the mainReactor thread pool to subReactor thread pool. Here you bind a thread’s selector from the subReactor thread pool to a client channel and listen for the client 0 event.

(3) pipeline.fireChannelReadComplete()

public ChannelPipeline fireChannelReadComplete() {
    if (channel.config().isAutoRead()) {
        read(a); }return this;
Read () – > tail. The read () – > next. InvokeRead () – > HeadContext. Read () – >… –> doBeginRead()

protected void doBeginRead() throws Exception {
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
I’m gonna change the listening bit here to OP_READ(1)

4. SubReactor processes the READ event

Go to NioByteUnsafe’s read() method

public final void read() {... final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); . byteBuf = allocHandle.allocate(allocator); . pipeline.fireChannelRead(byteBuf); . }Copy the code
public ChannelPipeline fireChannelRead(Object msg) {
    return this;
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerB: "+ msg); super.channelRead(ctx, msg); }}Copy the code

This is where client message processing is received


It creates as many selectors as there are threads in the Reactor thread pool. MainReactor’s eventLoop is bound to the server’s channel and only looks at the ACCEPT event of the server’s channel. The eventLoop of the subReactor is bound to the client channel and only looks at the READ events of the client channel.

The mainReactor and the subReactor cycle their respective selectors, the mainReactor cycle the ACCEPT event selector, the subReactor cycle the READ event selector, After the mainReactor receives a client connection, it executes ServerBootstrapAcceptor’s channelRead method to bind the client connection to the subReactor.