To read this article, you need to have a general understanding of NIO, and at least be able to write a NIO Hello World.

When it comes to NIO and Netty, THE Reactor model must be inseparable, because the model architecture is so classic. However, many people tend to ignore the basic learning when learning. It starts with Netty, which is high in various aspects, but they don’t settle down and have a good look at the cornerstone of Netty — Reactor model. This paper will take you to see the Reactor model, let you have a simple and perceptual understanding of the Reactor model.

Speaking of Reactor, I have to mention an article written by Doug Lea, the famous author of the first Java code, I will try to pick out some important points from the article and talk about the Reactor model based on my understanding, to see how different Doug Lea’s brain circuit is.

Classic service design

This is the most traditional Socket service design, there are multiple clients to connect to the server, the server will open many threads, a thread for a client service.

In most scenarios, there are several steps to handle a network request:

  1. Read: Reads data from the socket.
  2. Because data on the network is transmitted in byte format, decoding is necessary to obtain a real request.
  3. You can do whatever you want.
  4. Encode: similarly, because the data on the network are transmitted in the form of byte, that is, socket only receives byte, so coding is necessary.

Let’s look at traditional BIO code:

public static void main(String[] args) { try { ServerSocket serverSocket = new ServerSocket(9696); Socket socket = serverSocket.accept(); new Thread(() -> { try { byte[] byteRead = new byte[1024]; socket.getInputStream().read(byteRead); String req = new String(byteRead, StandardCharsets.UTF_8); //encode //do something

                    byte[] byteWrite = "Hello".getBytes(StandardCharsets.UTF_8);//decode
                    socket.getOutputStream().write(byteWrite);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
Copy the code

This code should not need to be explained, it should be all understandable, otherwise what is supporting you to see here…

The downside of this approach is obvious at a glance: it requires a large number of threads to be opened.

So we need to improve it, improve a thing, definitely need to have a goal, what is our goal? No cavities. Hey, buddy, you’re on the wrong set.

Our goals are:

  1. Gracefully degrade as the load increases;
  2. With the improvement of resources, performance can continue to improve;
  3. Also meet availability and performance metrics: 3.1 Low latency 3.2 Meeting peak demand 3.3 Adjustable quality of service

Let’s think about why traditional sockets have such drawbacks:

  1. Block no matter waiting for the connection of the client, or waiting for the data of the customer, are blocked, one man when shut, wan Fu mo open, no matter when you connect me, no matter when you give me the data, I still wait for you. Consider this: if accept() and read() were both non-blocking, wouldn’t the traditional Socket problem be half solved?
  2. The synchronous server is watching the client to see if the client is connecting to me or sending me data. If I can drink tea, pesticide, and you send data, connected to me, the system notify me, I go to deal with, that much better, so that the traditional Socket problem is solved half.

So God said NIO, and NIO came.

NIO

What does NIO stand for? Short for what? It’s a non-blocking IO model, but I think it’s better to call it New IO, at least in the Java world. How to understand after all, see everybody see officer.

NIO solves the traditional Socket problem:

  1. A thread can monitor multiple sockets, is no longer yifu shut, Wanfu mo open;
  2. Event-driven: When various events occur, the system can notify me and I will deal with them.

More concepts about NIO are not explained here, but are written to introduce today’s main character: Reactor.

Reactor

Before I talk about the Rector model, I’ll put out the client code that I’ll use to implement the Reactor model:

public class Client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket();
            socket.connect(new InetSocketAddress("localhost", 9090));
            new Thread(() -> {
                while (true) {
                    try {
                        InputStream inputStream = socket.getInputStream();
                        byte[] bytes = new byte[1024];
                        inputStream.read(bytes);
                        System.out.println(new String(bytes, StandardCharsets.UTF_8));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            while (true) {
                Scanner scanner = new Scanner(System.in);
                while(scanner.hasNextLine()) { String s = scanner.nextLine(); socket.getOutputStream().write(s.getBytes()); } } } catch (IOException e) { e.printStackTrace(); }}}Copy the code

Single-reactor single-thread model

This is the simplest Reactor model, where you see multiple clients connected to a Reactor and a dispatch within the Reactor.

Once a connection request is received, the Reactor dispatches it to acceptors for processing. Once an I/O read/write event is received, the Reactor dispatches it to a specific Handler for processing.

At this time, a Reactor is responsible for processing both the connection request and the read/write request. Generally speaking, it is fast to process the connection request, but the specific read/write request involves the business logic processing, which is relatively slow. While the Reactor is processing read/write requests, other requests have to wait, and only then can the next request be processed.

Voiceover: When I was studying NIO and Reactor, I had a problem that I couldn’t figure out: NIO is so powerful that a server can handle multiple clients at the same time without open threads. The next request cannot be processed until one request is processed. I don’t know if anyone shares my idea, hope I’m not the only one… NIO does not open the thread, a server can handle multiple clients at the same time, refers to a client can monitor the connection of multiple clients, read and write events, really do business processing or “one man when shut, ten thousand men do not open” effect.

The single-thread Reactor model is simple to program and is suitable for scenarios where each request can be completed quickly. However, it cannot give full play to the advantages of multi-core CPU. In general, the single-thread Reactor model is not used.

There are many things that can only be remembered once they are really practiced, just like the Reactor model. If you just look at the diagram, even if you think you understand it very thoroughly at that time, you will forget it all within half a month. Therefore, you still need to click on the keyboard to achieve a single Reactor single thread model.

public class Reactor implements Runnable {
    ServerSocketChannel serverSocketChannel;
    Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(selector, serverSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatcher(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}Copy the code

Defines a Reactor class.

In the constructor, connection events are registered, and an Acceptor object is attached to the selectionKey object, which is the class used to process connection requests.

The Reactor class implements the Runnable interface and the run method, which listens for events, calls the Dispatcher method, gets the selectionKey object from the Dispatcher method, and then calls the run method. Note that the run method is called. It does not start the thread. It is just a normal call.

public class Acceptor implements Runnable {
    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }

    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("There's a client connecting," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); selectionKey.attach(new WorkHandler(socketChannel)); } catch (IOException e) { e.printStackTrace(); }}}Copy the code

Currently, if there is an event, it must be a connection event, because only connection events are registered in the Reactor class constructor, not read and write events.

After the connection event, the Reactor dispatcher method receives the Acceptor add-on, calls the Acceptor’s Run method, registers the read event in the run method, and append a WorkHandler to the selectionKey.

Once an Acceptor’s run method is complete, it returns to the RUN method in the Reactor class, which listens for events.

At this point, Reactor listens for two events, a connect event and a read event.

When the client write event occurs, the Reactor calls the Dispatcher method again, and the additional object is the WorkHandler, so the Reactor runs to the Run method in the WorkHandler.

public class WorkHandler implements Runnable {
    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "The message came :" + message);
            socketChannel.write(ByteBuffer.wrap("I got your message.".getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { e.printStackTrace(); }}}Copy the code

The WorkHandler is really responsible for handling client write events.

public class Main { public static void main(String[] args) { Reactor reactor = new Reactor(9090); reactor.run(); }}Copy the code

Now we can test:

127.0.0.1:63912 /127.0.0.1:63912 /127.0.0.1:63912 /127.0.0.1:63912 /127.0.0.1:63912 /127.0.0.1:63912 /127.0.0.1:49290 send message is: I am not good /127.0.0.1:49428 send message is: hee hee heeCopy the code

Voice-over: The purpose of this article is to make people more convenient, more easily understand the Reactor model, in addition to the so many things, such as registration writing events, reading and writing switch, awaken, and so on, if you add these trivial things, is likely to let people astray, tangle: why do you want to register to write events, still can not write without register, why want to wake up, You can still listen to new events without waking up, which is not very relevant to the Reactor model.

Single-reactor multithreaded model

We know that the single-reactor single-thread model has so many disadvantages that we can take targeted measures to solve them. Let’s review the downside of the single-reactor single-thread model: While one client request is being processed, other requests have to wait.

So why don’t we just add the concept of multithreading? Yes, that’s the single-reactor multithreaded model.

As you can see, the Reactor is still responsible for both connection events and client write events, with the addition of a thread pool concept.

When a client initiates a connection request, the Reactor assigns the task to acceptors. When a client initiates a write request, the Reactor assigns the task to the thread pool so that a server can serve N clients simultaneously.

Let’s go ahead and implement a single-reactor multithreaded model:

public class Reactor implements Runnable {

    ServerSocketChannel serverSocketChannel;

    Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(9090));
            serverSocketChannel.configureBlocking(false);
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(serverSocketChannel, selector));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatcher(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}Copy the code
public class Acceptor implements Runnable {
    ServerSocketChannel serverSocketChannel;

    Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }


    @Override
    public void run() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("There's a client connecting," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("acceptor thread:"+ Thread.currentThread().getName()); selectionKey.attach(new WorkHandler(socketChannel)); } catch (IOException e) { e.printStackTrace(); }}}Copy the code
public class WorkHandler implements Runnable {

    static ExecutorService pool = Executors.newFixedThreadPool(2);

    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            System.out.println("workHandler thread:"+ Thread.currentThread().getName()); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.read(buffer); pool.execute(new Process(socketChannel, buffer)); } catch (IOException e) { e.printStackTrace(); }}}Copy the code
public class Process implements Runnable {

    private SocketChannel socketChannel;

    private ByteBuffer byteBuffer;

    public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        this.byteBuffer = byteBuffer;
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            System.out.println("process thread:" + Thread.currentThread().getName());
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "The message came :" + message);
            socketChannel.write(ByteBuffer.wrap("I got your message.".getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { e.printStackTrace(); }}}Copy the code
public class Main { public static void main(String[] args) { Reactor reactor = new Reactor(9100); reactor.run(); }}Copy the code

Single Reactor Single threaded code is not very different from single threaded code, but there is a concept of multithreading.

Let’s test it again:

127.0.0.1:55789 Acceptor thread:main /127.0.0.1:56681 Acceptor thread:main /127.0.0.1:55789 Acceptor thread:main /127.0.0.1:56850 Acceptor thread:main workHandler Thread :main process thread: pool1-thread-1 /127.0.0.1:55789: I am client 1 workHandler Thread :main process Thread :pool-1-thread-2 /127.0.0.1:56681: I am client 2 WorkHandler Thread :main process Thread :pool-1-thread-1 /127.0.0.1:56850Copy the code

It is clear that acceptors and workHandlers are still the main thread, but when it comes to Process, multithreading is enabled.

The single-reactor multithreaded model looks good, but there are drawbacks: A Reactor is also responsible for connection and read/write requests. Connection requests are fast, and a client usually only needs to connect once, but many write requests occur. If you have multiple reactors, one of which handles connection events, It would have been nice to have multiple reactors handling client-side write events, more consistent with a single responsibility, so the master-slave Reactor model was born.

Reactor model

The following is to implement a primary/secondary Reactor model. It should be noted that the primary/secondary Reactor model I implemented is different from the one shown in the picture. In the picture, there is one principal and one subordinate, but I realize one principal and eight subordinate. In the picture, there is a thread pool under a subReactor, while there is no thread pool under my subReactor. Although there are some differences, the core idea is the same.

public class Reactor implements Runnable {
    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(serverSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); } } } catch (IOException e) { e.printStackTrace(); } } private void dispatcher(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}Copy the code
public class Acceptor implements Runnable {
    private ServerSocketChannel serverSocketChannel;
    private final int CORE = 8;

    private int index;

    private SubReactor[] subReactors = new SubReactor[CORE];
    private Thread[] threads = new Thread[CORE];
    private final Selector[] selectors = new Selector[CORE];

    public Acceptor(ServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
        for (int i = 0; i < CORE; i++) {
            try {
                selectors[i] = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
            subReactors[i] = new SubReactor(selectors[i]);
            threads[i] = new Thread(subReactors[i]);
            threads[i].start();
        }
    }

    @Override
    public void run() {
        try {
            System.out.println("acceptor thread:" + Thread.currentThread().getName());
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("There's a client connecting," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            selectors[index].wakeup();
            SelectionKey selectionKey = socketChannel.register(selectors[index], SelectionKey.OP_READ);
            selectionKey.attach(new WorkHandler(socketChannel));
            if(++index == threads.length) { index = 0; } } catch (Exception e) { e.printStackTrace(); }}}Copy the code
public class SubReactor implements Runnable {
    private Selector selector;

    public SubReactor(Selector selector) {
        this.selector = selector;
    }


    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                System.out.println("selector:" + selector.toString() + "thread:" + Thread.currentThread().getName());
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatcher(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}Copy the code
public class WorkHandler implements Runnable {
    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run() {
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "The message came :" + message);
            socketChannel.write(ByteBuffer.wrap("I got your message.".getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { e.printStackTrace(); }}}Copy the code
public class Main { public static void main(String[] args) { Reactor reactor = new Reactor(9090); reactor.run(); }}Copy the code

One of the biggest differences is the Acceptor constructor. I open eight threads, eight Subreactors, and eight selectors. As soon as the application starts, eight threads execute the run method defined in the subReactor, listening for events. Read events are registered in the run method of acceptors, so the run method defined in ubReactor listens for read events.

Let’s test it out:

Acceptor thread: the main client connection came up, / 127.0.0.1:57986 selector: sun. Nio. Ch. WindowsSelectorImpl @ 94 f1d6thread: thread - 0 acceptor Thread: the main client connection is coming up, / 127.0.0.1:58142 selector: sun. Nio. Ch. WindowsSelectorImpl @ 1819 b93thread: thread 1 acceptor Thread: the main client connection is coming up, / 127.0.0.1:58183 selector: sun. Nio. Ch. 1 d04799thread WindowsSelectorImpl @ : thread - 2 The selector: sun. Nio. Ch. WindowsSelectorImpl @ 94 f1d6thread: Thread - 0/127.0.0.1:57986 sent message is: 1 The selector: sun. Nio. Ch. WindowsSelectorImpl @ 1819 b93thread: Thread 1/127.0.0.1:58142 sent message is: 2 The selector: sun. Nio. Ch. 1 d04799thread WindowsSelectorImpl @ : Thread - 2/127.0.0.1:58183 sent message is: 3 acceptor Thread: main Have client connection came up, / 127.0.0.1:59462 selector: sun. Nio. Ch. 11 d3ebfthread WindowsSelectorImpl @ : Thread - 3 The selector: sun. Nio. Ch. 11 d3ebfthread WindowsSelectorImpl @ : Thread - 3/127.0.0.1:59462 sent message is: 1111Copy the code

It is clear that acceptors have only one main thread from start to finish, and that different reactors and selectors are used to process client write requests.

Reactor model structure diagram

After reading the three Reactor models, we also need to look at the structure diagram of the Reactor model, which is from the best paper that is recognized in the industry.

It looks a little bit complicated, so let’s do it one by one.

  • Synchronous Event Demultiplexer: Synchronous Event Demultiplexer, which listens for various events and blocks when the caller calls the listening method until an Event occurs. For Linux, synchronous event separator refers to the IO multiplexing model, such as epoll, poll, etc. For Java NIO, the corresponding component of synchronous event separator is selector, and the corresponding blocking method is select.
  • Handler: Essentially a file descriptor, it is an abstract concept that can be understood as an event. The event can be external, such as client connection events, client write events, etc., or internal events, such as timer events generated by the operating system, etc.
  • Event Handler: The Event Handler is essentially a callback method. When an Event occurs, the framework invokes the corresponding callback method according to the Handler. In most cases, it is a virtual function, requiring users to implement interfaces and specific methods.
  • Concrete Event Handler: A Concrete Event Handler that is a Concrete implementation of the Event Handler.
  • The Initiation Dispatcher, which is actually the Reactor role, provides a series of methods for registering and removing Event handlers. Synchronous Event Demultiplexer is also called to listen for various events; When an Event occurs, the corresponding Event Handler is also called.

That’s the end of this article.