Threading model

Different thread model, has a great impact on the performance of the program, in order to understand the Netty thread model, we will systematically explain each thread mode, and finally look at the Netty thread model has what advantages.

The current thread models are:

  • Traditional blocking I/O service model
  • Reactor model

Depending on the number of reactors and the number of resource pool threads that are processed, there are three typical implementations:

  • Single Reactor single thread;
  • Single Reactor multithreading;
  • Reactor is multithreaded.

The Netty threading model is mainly based on the master-slave Reactor multithreading model, and some improvements are made. There are multiple reactors in the master-slave Reactor multithreading model.

Let’s see what the threading model looks like!

Traditional blocking I/O service model

Working principle diagram

Model features

  1. Use blocking IO mode to get input data.
  2. Each connection requires a separate thread to complete data input, business sorting, and data return.

Problem analysis

  1. When the number of concurrent requests is large, a large number of threads are created, consuming a large amount of system resources.
  2. After the connection is created, if the current thread has no data to read temporarily, the thread will block in the read operation, resulting in a waste of thread resources.

Reactor model

The solutions to the two disadvantages of the traditional blocking I/O service model are as follows:

  1. Based on the I/O feeding model: multiple connections share a blocking object, and the application only needs to wait on one blocking object, instead of blocking for all connections. When a connection has new data to process, the operating system notifies the application, and the thread returns from the blocked state to begin business processing.
  2. Threading resources based on thread pools: Instead of creating a thread for each link and assigning business processing tasks to threads after the connection completes, one thread can handle business for multiple connections.

Working principle diagram

I/O reuse combined with thread pool is the basic design idea of Reactor model.

Illustration shows that

  1. Reactor pattern, an event-driven pattern that simultaneously ships one or more inputs to the service processor.
  2. The Reactor pattern is also called the Dispatcher pattern because a server-side program processes incoming requests and dispatches them synchronously to the appropriate processing thread.
  3. The Reactor model uses IO to reuse listening events and distribute them to a thread (process) after receiving the events, which is the key to high concurrency processing of network servers.

Core composition in the Reactor schema

  1. Reactor: The Reactor runs in a separate thread that listens for and dispatches events to appropriate handlers to respond to IO events. It is like a corporate telephone operator that takes calls from customers and transfers the line to the appropriate contacts.
  2. Handlers: The actual events that the program performs the I/O events to accomplish, similar to the actual officials in the company that the customer wants to talk to. Reactor responds to I/O events by scheduling appropriate handlers that perform non-blocking actions.

Reactor single-threaded

The model illustration

As you can see from the legend, there are multiple clients in the application program. The Client can initiate requests and connect to the server. The Dispatcher in the Reactor Dispatcher dispatches the request to acceptors for connection. After the connection is successful, it is forwarded to Handler for service operations.

The single-threaded Reactor model is so straightforward that once you know it, you can easily code by legend.

Code sample

Based on the model legend, the following code can be obtained

/** * Reactor single-thread model *@author connor
 */
public class SingleThreadReactor implements Runnable {

    /** * defines server channels, selectors, and ports */
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private static final int PORT = 6666;

    /** * In the constructor, initialize the attribute **@throws IOException
     */
    public SingleThreadReactor(a) throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        // Set non-blocking
        serverSocketChannel.configureBlocking(false);
        // Bind ports
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        selector = Selector.open();
        // Register the channel with the selector, listen for the Accept event, and create a new Acceptor object as an attached object
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocketChannel, selector));
    }

    @Override
    public void run(a) {
        while (true) {
            try {
                int select = selector.select();
                if (select > 0) {
                    // Retrieve the selectionKeys of the event that occurred
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey selectionKey = keyIterator.next();
                        // Use the dispenser to distribute eventsdispatcher(selectionKey); keyIterator.remove(); }}}catch(IOException e) { e.printStackTrace(); }}}/** ** Distribution event **@param selectionKey
     */
    private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }

    public static void main(String[] args) throws IOException {
        SingleThreadReactor str = newSingleThreadReactor(); str.run(); }}Copy the code

According to the legend, a Reactor accepts requests from clients and distributes them to acceptors and handlers through a distributor.

/** * receiver **@author connor
 */
public class Acceptor implements Runnable {
    // Define channels and selectors
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    /** * initializes the attribute ** in the constructor@param serverSocketChannel
     * @param selector
     */
    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    @Override
    public void run(a) {
        SocketChannel socketChannel = null;
        try {
            // Accept connection requests
            socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            // Register the channel with the selector and listen for read events
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
            // Create a new Handler object as an attached object
            selectionKey.attach(new Handler(socketChannel));
        } catch(IOException e) { e.printStackTrace(); }}}Copy the code
public class Handler implements Runnable
{

   private SocketChannel socketChannel;

   public Handler(SocketChannel socketChannel)
   {
      this.socketChannel = socketChannel;
   }

   @Override
    public void run(a) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            int read = socketChannel.read(byteBuffer);
            if (read > 0) {
                System.out.println("Client:" + socketChannel.getRemoteAddress() + "Send a message:" + new String(byteBuffer.array()));
                socketChannel.write(ByteBuffer.wrap(("From the server:" + socketChannel.getLocalAddress() + "Response successful...").getBytes(StandardCharsets.UTF_8))); }}catch(IOException e) { e.printStackTrace(); }}}Copy the code

The test results are:

Strengths and weaknesses of Reactor’s single-threaded model

The advantage of Reactor single-thread model is that the model is simple and there is no multi-thread, so the problem of multi-core competing resources is eliminated.

However, the disadvantages of the single-threaded model are also obvious, that is, a single thread can not take advantage of the multi-core CPU, when the Handler thread processing, will cause blocking, can not take advantage of the NIO one thread truly processing multiple clients.

Reactor multithreaded model

The model illustration

The Reactor that uses multiple threads is that instead of single-threaded processing, the Handler is assigned to a thread pool. Each time something needs to be executed, the pool starts a thread to execute it, and the Handler doesn’t have to block there. Once the Reactor is finished, it writes the result to a channel. Return to the client.

Code sample

The following code is basically the same as the single-threaded model code, with the addition of a Process class.

/** * processor **@author connor
 */
public class Handler implements Runnable {

    private ExecutorService executors = Executors.newFixedThreadPool(2);

    private SocketChannel socketChannel;

    public Handler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run(a) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        executors.execute(newProcess(socketChannel, byteBuffer)); }}Copy the code
/** ** Operation **@author connor
 */
public class Process implements Runnable {

    private SocketChannel socketChannel;
    private ByteBuffer byteBuffer;

    public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        this.socketChannel = socketChannel;
        this.byteBuffer = byteBuffer;
    }

    @Override
    public void run(a) {
        try {
            int read = socketChannel.read(byteBuffer);
            if (read > 0) {
                System.out.println("Client:" + socketChannel.getRemoteAddress() + "Send a message:" + new String(byteBuffer.array()));
                socketChannel.write(ByteBuffer.wrap(("From the server:" + socketChannel.getLocalAddress() + "Response successful...").getBytes(StandardCharsets.UTF_8))); }}catch(IOException e) { e.printStackTrace(); }}}Copy the code

The code results are as follows:

Reactor multithreaded model pros and cons

  1. Advantages: It makes full use of the processing power of the multi-core CPU
  2. Disadvantages: Multithreaded data sharing and access is complex, Reactor processes all events monitoring and response, running in a single thread, prone to performance bottlenecks in high concurrency scenarios.

Reactor master-slave model

The model illustration

  1. ReactorThe main threadMainReactorObject throughselectListen for connection events. After receiving the event, passAcceptorHandling connection events
  2. whenAcceptorAfter processing the connection event,MainReactorAssigns connections toSubReactor
  3. subreactorJoins the connection to the connection queue for listening and createshandlerPerform various event processing
  4. When something new happens,subreactorWill call the correspondinghandlerTo deal with
  5. handlerthroughreadRead the data and distribute it to the nextworkerthreading
  6. workerThread pool allocation is independentworkerThe thread performs business processing and returns results
  7. handlerAfter receiving the result of the response, pass againsendReturns the result toclient
  8. ReactorThere can be multiple main threadsReactorChild thread, i.eMainRecatorYou can associate more than oneSubReactor

Code sample

public class Reactor implements Runnable {
    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Reactor(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor(serverSocketChannel));
        } catch(IOException e) { e.printStackTrace(); }}@Override
    public void run(a) {
        try {
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); }}}catch(IOException e) { e.printStackTrace(); }}private void dispatcher(SelectionKey selectionKey) {
        Runnable runnable = (Runnable) selectionKey.attachment();
        runnable.run();
    }
    
    public static void main(String[] args) {
        Reactor reactor = new Reactor(9090); reactor.run(); }}Copy the code
public class Acceptor implements Runnable {
    private ServerSocketChannel serverSocketChannel;
    private final int CORE = 8;

    private int index;

    private SubReactor[] subReactors = new SubReactor[CORE];
    private Thread[] threads = new Thread[CORE];
    private final Selector[] selectors = new Selector[CORE];

    public Acceptor(ServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
        for (int i = 0; i < CORE; i++) {
            try {
                selectors[i] = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
            subReactors[i] = new SubReactor(selectors[i]);
            threads[i] = newThread(subReactors[i]); threads[i].start(); }}@Override
    public void run(a) {
        try {
            System.out.println("acceptor thread:" + Thread.currentThread().getName());
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("There's a client connecting," + socketChannel.getRemoteAddress());
            socketChannel.configureBlocking(false);
            selectors[index].wakeup();
            SelectionKey selectionKey = socketChannel.register(selectors[index], SelectionKey.OP_READ);
            selectionKey.attach(new WorkHandler(socketChannel));
            if (++index == threads.length) {
                index = 0; }}catch(Exception e) { e.printStackTrace(); }}}Copy the code
public class SubReactor implements Runnable {
    private Selector selector;

    public SubReactor(Selector selector) {
        this.selector = selector;
    }


    @Override
    public void run(a) {
        while (true) {
            try {
                selector.select();
                System.out.println("selector:" + selector.toString() + "thread:" + Thread.currentThread().getName());
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); }}catch(IOException e) { e.printStackTrace(); }}}private void dispatcher(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}Copy the code
public class WorkHandler implements Runnable {
    private SocketChannel socketChannel;

    public WorkHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void run(a) {
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
            System.out.println(socketChannel.getRemoteAddress() + "The message came :" + message);
            socketChannel.write(ByteBuffer.wrap("I got your message.".getBytes(StandardCharsets.UTF_8)));
        } catch(IOException e) { e.printStackTrace(); }}}Copy the code

Strengths and weaknesses of Reactor master-slave model

Advantages: Simple data interaction between the parent thread and its child thread has clear responsibilities. The parent thread only needs to receive new connections, and the child thread completes subsequent service processing. The main thread only needs to pass the new connection to the child thread, and the child thread does not need to return data.

Disadvantages: As you can see, the master-slave model is very complex and has a lot of code.