What is the Reactor thread model

The Thread model in Java can be roughly divided into:

  1. Single threaded model
  2. Multithreaded model
  3. Thread Pool model (Executor)
  4. Reactor thread model

In the single-thread model, the server side uses one thread to process all the requests, and all the requests must be serialized, which is inefficient. Multithreaded model, the server side will share the one thread for each request to handle the request, multi-thread model is more efficient than the level of the single thread model, but the disadvantage of multithreading model is also evident: the server side for each request a thread to handle the request, if the number of requests is large, can cause a lot of thread is created, out of memory. Threads are expensive objects in Java. The number of threads should not be increased indefinitely. When the number of threads exceeds a certain number, increasing the number of threads will not improve efficiency, but reduce efficiency. With the idea of “object reuse”, thread pools are born. There are usually a certain number of threads in a thread pool, and when the number of requests exceeds the number of threads, they need to queue up. This prevents the thread from constantly growing. I’m not going to talk too much about Java thread pools here, but if you’re interested, check out my previous article: Java Thread Pools.

Reactor is a process model. The Reactor model is a common model for concurrent I/O processing. It is used for synchronous I/O. The core idea is to register all I/O events to be processed to a central I/O multiplexer, and the main thread/process is blocked on the multiplexer. Once an I/O event arrives or is ready (file descriptors or socket readable and writable), the multiplexer returns and distributes the pre-registered corresponding I/O event to the corresponding processor.

Reactor is also an implementation mechanism. Reactor uses an event-driven mechanism, which differs from regular function calls in that: Instead of an application calling an API, the Reactor inverts the event processing process. The application needs to provide and register the interfaces to the Reactor. If an event occurs, the Reactor actively calls the interfaces registered by the application, also called “callback functions”. The Hollywood rule is perfect for Reactor: Don’t call us, we’ll call you.

Why do I need a Reactor model

The Reactor model is essentially a wrapper around I/O multiplexing. Theoretically, I/O multiplexing is efficient enough, why do you need a Reactor model? The answer is that I/O multiplexing, while high performance, is too complex to be engineering efficient. Hence the Reactor model.

Each network request may involve multiple I/O requests, compared with the traditional single thread processing complete request life period, the method of I/O multiplexing in the human brain thinking is not natural, because programmers programming, process requests A, assume that A request must pass through multiple I/O operations A1 – An possible interval between twice (IO) for A long time, After each I/O operation, the call to I/O reuse will most likely return request B instead of A. Request A is frequently interrupted by request B, and request B is interrupted by C. In this mindset, programming is prone to error.

Reactor model

Generally speaking, there are five steps to process a network request:

  1. Read Request Data
  2. Decode Data (decode Request)
  3. Compute, generate response (compute)
  4. Encode Response
  5. Send response data is shown in the figure below:

The Reactor model has three threading models:

  1. Single threaded model
  2. Multithreaded model (Single Reactor)
  3. Multithreaded model (MULTIPLE Reactor)

Single-threaded Reactor model

In the single-threaded model, Reactor is responsible for both accepting new connection requests and dispatching them to a specific handler for processing. This model is generally not used because of the low efficiency of single-threading.

The following is an implementation based on the Java NIO single-thread Reactor model:

class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Filter (int port) throws IOException {// Reactor sets selector = selector. Open (); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); Sk.attach (new Acceptor()); } public voidrun() { // normally in a new Thread
        try {
            while(! Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator();while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if(r ! = null) r.run(); } class Acceptor implements Runnable { // inner public voidrun() {
            try {
                SocketChannel c = serverSocket.accept();
                if(c ! = null) new Handler(selector, c); } catch (IOException ex) { /* ... */ } } } } final class Handler implements Runnable { static final int READING = 0, SENDING = 1; final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(1024); ByteBuffer output = ByteBuffer.allocate(1024); int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() {/ *... */ } booleanoutputIsComplete() {/ *... */ } voidprocess() {/ *... */ } public voidrun() {
        try {
            if (state == READING) read(a);else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if(outputIsComplete()) sk.cancel(); }}Copy the code

Multithreaded model (Single Reactor)

This model adopts multithreading (thread pool) in the Handler chain, which is also a common model for back-end programs. The model diagram is as follows:

Multithreaded model (MULTIPLE Reactor)

Compared with the multithreaded single-rector model, the Reactor is divided into two parts. The mainReactor listens for and accepts new connections, and then assigns established sockets to the subReactor through multiplexers. SubReactor is responsible for multiplexing connected sockets and reading and writing network data. Business processing functions, which are handed over to the worker thread pool. In general, the number of subreactors can be equal to the number of cpus. Its model diagram is as follows:

Netty threading model

Netty’s threading model is similar to Reactor model. ServerBootstrap is used to create a server in Netty.

Here is a complete Netty server example:

public class TimeServer {
    public void bind(int port) {// Netty's multiple Reactor thread model, bossGroup, is used to accept connections. WorkGroup is a pool of Worker threads that process business. BossGroup = new NioEventLoopGroup(); bossGroup = new NioEventLoopGroup(); WorkGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); ChannelFuture f = b.bind(port).sync(); ChannelFuture f = b.bind(port).sync(); // Wait for the server listening port to close. } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } } } public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object MSG) throws Exception {// MSG turn Buf ByteBuf Buf = (ByteBuf) MSG; Byte [] req = new byte[buf.readableBytes()]; // Write the array buf.readbytes (req); String body = new String(req,"UTF-8");
        String currenTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(
                System.currentTimeMillis()).toString() : "BAD ORDER"; ByteBuf resp = unpooled.copiedBuffer (currentime.getBytes ()); // Buffer write channel ctx.write(resp); } @override public void channelReadComplete(ChannelHandlerContext CTX) throws Exception {// Write Invoke after reading the buffer array Flush Write channel ctx.flush(); }}Copy the code

The resources

Scalable IO in Java Netty Series Netty threading models