A list,

1. What is Reactor?

A Reactor Pattern is an event processing pattern used to process service requests that are delivered to the server simultaneously through one or more inputs. The service handler reuses incoming requests and dispatches them synchronously to the associated handler. Key points:

(1) Event driven

(2) Processing multiple inputs

(3) Use multiplexing to distribute the event to the corresponding Handler for processing

2. Reactor main components

(1) Reactor

Responsible for responding to events that bind event distribution to Handler handling of the event. Nioeventloop.run (),processSelectedKeys() for Netty.

(2) Handler

Event handler, bound to a certain type of event, responsible for the execution of the corresponding event task to process the event. Corresponding to netty IdleStateHandler.

(3) Acceptor

Acceptors are a special class of handlers that, in isolation, are reactor’s event receiver class that initializes the selector and receives the buffer queue. ServerBootstrapAcceptor corresponding to netty.

Second, the process

1. Create a mainReactor thread pool and a subReactor thread pool

	bossGroup = new NioEventLoopGroup();
	workGroup = new NioEventLoopGroup(4);
Copy the code
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { children = new SingleThreadEventExecutor[nThreads]; .for(int i = 0; i < nThreads; i ++) { ... children[i] = newChild(threadFactory, args); . }}Copy the code
@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
Copy the code

Here the mainReactor and subReactor thread pools are created, and the eventLoop thread is created

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}
Copy the code

Each eventLoop thread has its own selector. The eventLoop thread has not been started yet, and when it is started, it will perform the selector in Run ().

2. The mainReactor binds selector to OP_ACCEPT and loops selector. Select ();

ChannelFuture regFuture = group().register(channel); The group() here is the bossGroup

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
Copy the code

Next ()

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
Copy the code
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    @Override
    public EventExecutor next() {
        returnchildren[childIndex.getAndIncrement() & children.length - 1]; }}Copy the code

Retrieve the first eventLoop from the thread pool

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
     ...
    channel.unsafe().register(this, promise);
    return promise;
}
Copy the code
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
    try {
        eventLoop.execute(new OneTimeTask() {
            @Override
            public void run() { register0(promise); }}); } catch (Throwable t) { }Copy the code

Here the mainReactor’s eventLoop is bound to the server’s NioServerSocketChannel. Since the main thread was initially started, eventloop. execute was executed, where mainReactor only started one thread.

@Override
public void execute(Runnable task) {
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else{ startThread(); addTask(task); . }... }Copy the code

Start the mainReactor thread loop by executing startThread() in execute and adding the Task register0(Promise) to the taskQueue to execute the mainReactor thread loop.

private void register0(ChannelPromise promise) {
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    if(firstRegistration && isActive()) { pipeline.fireChannelActive(); }}Copy the code
@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for(;;) {... selectionKey = javaChannel().register(eventLoop().selector, 0, this); . }}Copy the code

Register the selector for eventLoop in the mainReactor with an action listening bit of 0. Bind the server socketchannel to the main Reactor thread in doBind()–>doBind0 –>channel.bind()–>… – > next. InvokeBind () – > HeadContext. The Bind () – > unsafe. The Bind () – > pipeline. FireChannelActive () – > channel. The read () – >… –>doBeginRead() changed to OP_ACCEPT(16) operation listening bit.

@Override
protected void doBeginRead() throws Exception {... final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code
From there, mainReactor's eventLoop loops selector. Select from run. Note: The value of readInterestOp comes from the constructor that created NioServerSocketChannelCopy the code
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
Copy the code

3. SubReactor registers the OP_READ event

After receiving a client connection, the client Channel will be registered with the subReactor thread in the ServerBootstrapAcceptor and bound to the Selector of the subReactor thread. Listen for the OP_READ event on the client channel

if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); }Copy the code

When listening on a client connection, execute read() on the server AbstractNioUnsafe;

@Override
public void read() {... intlocalRead = doReadMessages(readBuf); .for (int i = 0; i < size; i ++) {
        pipeline.fireChannelRead(readBuf.get(i)); }... pipeline.fireChannelReadComplete(); . }Copy the code

(1) doReadMessage

@Override
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); . buf.add(new NioSocketChannel(this, ch)); . }Copy the code
public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}
Copy the code
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}
Copy the code

Set client channel listening bit to OP_READ(1)

(2) pipeline.fireChannelRead()

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
   final Channel child = (Channel) msg;
   child.pipeline().addLast(childHandler);

   for(Entry<ChannelOption<? >, Object> e: childOptions) { try {if(! child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e);
           }
       } catch (Throwable t) {
           logger.warn("Failed to set a channel option: "+ child, t); }}for(Entry<AttributeKey<? >, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(newChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if(! future.isSuccess()) { forceClose(child, future.cause()); }}}); } catch (Throwable t) { forceClose(child, t); }}}Copy the code

The ServerBootstrapAcceptor not only binds the subReactor to the client channel, but also initializes some parameters for the client channel

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
Copy the code

Same as register above, but change the mainReactor thread pool to subReactor thread pool. Here you bind a thread’s selector from the subReactor thread pool to a client channel and listen for the client 0 event.

(3) pipeline.fireChannelReadComplete()

@Override
public ChannelPipeline fireChannelReadComplete() {
    head.fireChannelReadComplete();
    if (channel.config().isAutoRead()) {
        read(a); }return this;
}
Copy the code

Read () – > tail. The read () – > next. InvokeRead () – > HeadContext. Read () – >… –> doBeginRead()

@Override
protected void doBeginRead() throws Exception {
    ...
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); }}Copy the code

I’m gonna change the listening bit here to OP_READ(1)

4. SubReactor processes the READ event

	if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); . }Copy the code

Go to NioByteUnsafe’s read() method

@Override
public final void read() {... final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); . byteBuf = allocHandle.allocate(allocator); . pipeline.fireChannelRead(byteBuf); . }Copy the code
@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
Copy the code
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); }}Copy the code
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerB: "+ msg); super.channelRead(ctx, msg); }}Copy the code

This is where client message processing is received

conclusion

It creates as many selectors as there are threads in the Reactor thread pool. MainReactor’s eventLoop is bound to the server’s channel and only looks at the ACCEPT event of the server’s channel. The eventLoop of the subReactor is bound to the client channel and only looks at the READ events of the client channel.

The mainReactor and the subReactor cycle their respective selectors, the mainReactor cycle the ACCEPT event selector, the subReactor cycle the READ event selector, After the mainReactor receives a client connection, it executes ServerBootstrapAcceptor’s channelRead method to bind the client connection to the subReactor.