This is the 15th day of my participation in the August More Text Challenge. For details, see:August is more challenging
preface
Speaking of NIO and Netty, the Reactor model must be impossible to be wrapped around, because this model architecture is too classic, so let’s take a look at the cornerstone of Netty – Reactor model.
This article will take you to look at the Reactor model, let you have a shallow and perceptual understanding of the Reactor model.
Design models for traditional services
This is the most traditional Socket service design, there are multiple clients connected to the server, the server will open many threads, one thread for a client service.
In most scenarios, processing a network request involves the following steps:
read
: Reads data from the socket.decode
Data on the network is transmitted in the form of bytes. To get a real request, it must be decoded.compute
Computing is business processing. You can do whatever you want.encode
: encoding. Similarly, data on the network is transmitted in the form of bytes, that is, sockets only receive bytes. Therefore, encoding is required.
For more on the pitfalls of this model, read the previous article: An In-depth Analysis of Java IO (ii) BIO
NIO distribution model
NIO is a good solution to the traditional Socket problem:
- A thread can listen to more than one Socket, no longer a man when off, wan Fu mo open;
- Event-driven: when various events occur, the system can notify me and I will deal with them.
I won’t go into too much detail here, but see my previous article: An In-depth Analysis of Java IO (3) NIO
Reactor model
Reactor, also known as a Reactor model, has the following characteristics:
- The Reactor model responds to I/O events by assigning appropriate processors.
- Each processor performs non-blocking operations.
- Managed by binding handlers to events.
The Reactor model integrates the two advantages of distribution model and event driven, and is especially suitable for processing massive I/O events and high concurrency scenarios.
1. Reactor process for processing requests
The request processing process of Reactor is divided into read and write operations.
For the read operation, the process is as follows:
- The application registers the read-ready event and the associated event handler.
- The event dispatcher waits for events to occur.
- When a read-ready event occurs, the event splitter calls the event handler registered in the first step.
The write operation is similar to the read operation, except that the first step is to register a write-ready event.
2. Reactor
Three roles are defined in the Reactor model.
Reactor
: is responsible for listening and assigning events, and dispatching I/O events to the corresponding Handler. New events include connection ready, read ready, write ready, and so on.Acceptor
: Handles new client connections and dispatches requests to the handler chain.Handler
: Binds itself to an event and performs a non-blocking read/write task, completechannel
After processing the business logic, it is responsible for writing out the resultsChannel
. Available resource pools.
According to different application scenarios, the Reactor model can be divided into single Reactor single-thread model, single Reactor multi-thread model and master-slave Reactor multi-thread model.
Single Reactor single thread model
The following figure shows a single-thread Reactor design model. The Reactor thread multiplexes the socket, and the Accept thread receives new connections and dispatches requests to the Handler.
1. Message processing process
The message processing process of single Reactor and single thread model is as follows:
- The Reactor object monitors connection events through select, and forwards events through Dispatch.
- If it is a connection establishment event, the Acceptor receives the connection and creates a Handler to handle subsequent events.
- If it is not a set up connection event, then Reactor distributes the call Handler to respond.
- Handler will complete read, decode, compute, encode, send and a whole set of processes.
2 and disadvantages
Single Reactor single thread model only separates components in code, but the whole operation is still single thread, which cannot make full use of hardware resources. Handler The service processing is not asynchronous.
For some small capacity applications, the single Reactor and single thread model can be used. However, it is not suitable for high load and high concurrency application scenarios. The main reasons are as follows:
- Even if the CPU load of the Reactor thread reaches 100%, the read, decode, compute, encode, and send of massive messages cannot be satisfied.
- When a single Reactor thread is overloaded, the processing speed slows down, causing a large number of client connections to time out. After the timeout, the Reactor thread will be resent, which increases the load of the Reactor thread. In the end, a large number of messages will be backlogged and timed out, which becomes the performance bottleneck of the system.
- If the Reactor thread is interrupted or enters an infinite loop, the communication module of the whole system becomes unavailable and cannot receive and process external messages. As a result, the Reactor node fails.
In order to solve the above problems, a single Reactor multi-threaded model appears.
Single Reactor multi-threaded model
The following figure shows the multi-threaded design model of a single Reactor. This model adopts multi-threading (thread pool) in the part of event Handler.
1. Message processing process
The message processing process of single Reactor multi-threaded model is as follows:
- The Reactor object uses select to monitor events requested by clients, and dispatches events after receiving them.
- If the connection request event is set up, the Acceptor processes the connection request by accepting, and then creates a Handler object to handle the subsequent events after the connection completes.
- If it is not a set up connection event, then Reactor distributes the call Handler to respond.
- The Handler is only responsible for responding to events and does not do specific business processing. After reading data, it will distribute the data to subsequent Worker thread pools for business processing.
- The Worker thread pool allocates independent threads to complete the real business processing and sends the response result to the Handler for processing.
- After receiving the response result, the Handler sends the response result to the Client.
In contrast to the first model, the business logic is handled by the thread pool, and the Handler sends the response back to the client. In this way, the performance cost of Reactor can be reduced, so that the Reactor can concentrate on event distribution and improve the throughput performance of the whole application.
2 and disadvantages
The single Reactor multi-thread model has the following problems.
- Multi-threaded data sharing and access is complex. If the subthread completes the business processing and sends the results to the main thread (Reactor) for sending, mutual exclusion and protection mechanism of shared data will be involved.
- Reactor monitors and responds to all events. It runs only in the main thread, which may cause performance problems. For example, there are millions of concurrent client connections, or the server needs to authenticate the client handshake for security, but the authentication itself is very performance consuming.
In order to solve the performance problems mentioned above, a third master-slave Reactor multithreading model was developed.
Master-slave Reactor multithreaded model
Compared with the single Reactor multi-threaded model, the master-slave Reactor multi-threaded model divides the Reactor into two parts.
- MainReactor (mainReactor) monitors the Server Socket, processes network I/O connection events, and registers the established SocketChannel to SubReactor.
- The SubReactor (from the Reactor) mainly interacts with the socket connected to the Reactor for data interaction and event business processing. Generally, the number of subreactors is the same as the number of cpus.
Nginx, Swoole, Memcached, and Netty have all adopted this implementation.
The message processing process of the master-slave Reactor multi-thread model is as follows:
- A Reactor thread is randomly selected from the main thread pool as the Acceptor thread to bind the listening port and receive client connections.
- After receiving client connection requests, the Acceptor thread creates a new SocketChannel and registers it with the other Reactor threads in the main thread pool. The SocketChannel is responsible for access authentication, IP blacklist and whitelist filtering, and handshake.
- After the above steps are completed, the link at the business layer is established, the SocketChannel is removed from the multiplexer of the Reactor thread in the main thread pool, re-registered to the threads in the sub-thread pool, and a Handler is created to handle various connection events.
- When a new event occurs, the SubReactor calls the Handler corresponding to the connection to respond.
- After the Handler reads the data, it distributes the data to subsequent Worker thread pools for service processing.
- The Worker thread pool allocates independent threads to complete the real business processing and sends the response result to the Handler for processing.
- After receiving the response result, the Handler sends the response result to the Client.
Example of master-slave Reactor multithreaded model
1, the Reactor
public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
selector = Selector.open(); // Open a Selector
serverSocketChannel = ServerSocketChannel.open(); // Create a Server channel
serverSocketChannel.socket().bind(new InetSocketAddress(port)); // Bind the service port
serverSocketChannel.configureBlocking(false); // In selector mode, all channels must be non-blocking
// Reactor is the entry. The first events registered to a channel are accept
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// Bind the Acceptor processing class
sk.attach(new Acceptor(serverSocketChannel));
}
@Override
public void run(a) {
try {
while(! Thread.interrupted()) {int count = selector.select(); // Block before the ready event arrives
if (count == 0) {
continue;
}
Set<SelectionKey> selected = selector.selectedKeys(); // Get the ready events obtained by this select
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
// This is where task distribution takes placedispatch((SelectionKey) (it.next())); } selected.clear(); }}catch(IOException e) { e.printStackTrace(); }}void dispatch(SelectionKey k) {
// The attached object is Acceptor
Runnable r = (Runnable) (k.attachment());
// Call the previously registered callback object
if(r ! =null) { r.run(); }}}Copy the code
The module content contains two core methods, select and Dispatch, and the module is responsible for listening for ready events and handling the distribution of events. Distribute the attached object as the Acceptor processing class.
2, Acceptor,
public class Acceptor implements Runnable {
private final ServerSocketChannel serverSocketChannel;
private final int coreNum = Runtime.getRuntime().availableProcessors(); // Number of CPU cores
private final Selector[] selectors = new Selector[coreNum]; // Create selector for SubReactor
private int next = 0; // Use subindex of subReactor
private SubReactor[] reactors = new SubReactor[coreNum]; // subReactor
private Thread[] threads = new Thread[coreNum]; // subReactor processing thread
Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
this.serverSocketChannel = serverSocketChannel;
/ / initialization
for (int i = 0; i < coreNum; i++) {
selectors[i] = Selector.open();
reactors[i] = new SubReactor(selectors[i], i); // Initialize the sub reactor
threads[i] = new Thread(reactors[i]); // Initialize the thread running the sub reactor
threads[i].start(); // Run (SubReactor run)}}@Override
public void run(a) {
SocketChannel socketChannel;
try {
socketChannel = serverSocketChannel.accept(); / / the connection
if(socketChannel ! =null) {
System.out.println(String.format("accpet %s", socketChannel.getRemoteAddress()));
socketChannel.configureBlocking(false);
// Note that a selector cannot register a new event when it is selected, so pause the segment triggered by the select method.
// Weakup (weakup) and setRestart (setRestart) do this. Refer to the run method in the SubReactor
reactors[next].registering(true);
selectors[next].wakeup(); // Causes a blocked selector operation to return immediately
SelectionKey selectionKey =
socketChannel.register(selectors[next], SelectionKey.OP_READ); // Register a read event
selectors[next].wakeup(); // Causes a blocked selector operation to return immediately
// After the event registration is complete, we need to trigger the execution of select again.
// Set the Restart to false (for details, see SubReactor run).
reactors[next].registering(false);
/ / bind Handler
selectionKey.attach(new AsyncHandler(socketChannel, selectors[next], next));
if (++next == selectors.length) {
next = 0; // Redistribute after crossing the boundary}}}catch(IOException e) { e.printStackTrace(); }}}Copy the code
This module processes the connection ready event, initializes a batch of subreactors for distribution, gets the socketChannel from the client, and binds the Handler so that you can continue with the read and write tasks.
3, subReactor
public class SubReactor implements Runnable {
private final Selector selector;
private boolean register = false; // Register switch representation
private int num; // The serial number is the subindex when the Acceptor initializes the SubReactor
SubReactor(Selector selector, int num) {
this.selector = selector;
this.num = num;
}
@Override
public void run(a) {
while(! Thread.interrupted()) { System.out.println(String.format("NO %d SubReactor waitting for register...", num));
while(! Thread.interrupted() && ! register) {try {
if (selector.select() == 0) {
continue; }}catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()) { dispatch(it.next()); it.remove(); }}}}private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment());
if(r ! =null) { r.run(); }}void registering(boolean register) {
this.register = register; }}Copy the code
This class is responsible for the select event that an Acceptor sends itself, which in this case is actually the read and send operations.
4, AsyncHandler
public class AsyncHandler implements Runnable {
private final Selector selector;
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
private final static int READ = 0; // Read ready
private final static int SEND = 1; // The response is ready
private final static int PROCESSING = 2; / / processing
private int status = READ; // All connections start with a read action
private int num; // From the reactor number
// Enable the asynchronous processing thread pool with 4 threads
private static final ExecutorService workers = Executors.newFixedThreadPool(5);
AsyncHandler(SocketChannel socketChannel, Selector selector, int num) throws IOException {
this.num = num; // The Handler is marked to distinguish which execution is triggered from the reactor
this.socketChannel = socketChannel; // Receive the client connection
this.socketChannel.configureBlocking(false); // Set to non-blocking mode
selectionKey = socketChannel.register(selector, 0); // Register the client with the selector
selectionKey.attach(this); // Attach a Handler object, currently a Handler object
selectionKey.interestOps(SelectionKey.OP_READ); // The connection is complete, so the next step is to read
this.selector = selector;
this.selector.wakeup();
}
@Override
public void run(a) {
// If a task is being processed asynchronously, then the run does not trigger any processing directly.
// Read and send are only responsible for simple data reading and response. Business processing does not block processing here at all
switch (status) {
case READ:
read();
break;
case SEND:
send();
break;
default:}}private void read(a) {
if (selectionKey.isValid()) {
try {
readBuffer.clear();
// The end of the read method means that "read ready" changes to "read Done", marking the end of a ready event
int count = socketChannel.read(readBuffer);
if (count > 0) {
status = PROCESSING; // Set to processing
workers.execute(this::readWorker); // Asynchronous processing
} else {
selectionKey.cancel();
socketChannel.close();
System.out.println(String.format("NO %d SubReactor read closed", num)); }}catch (IOException e) {
System.err.println("An exception occurred while processing a read service! Exception message:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e1) {
System.err.println("An exception occurred while closing the channel for read service! Exception message:"+ e.getMessage()); }}}}void send(a) {
if (selectionKey.isValid()) {
status = PROCESSING; // Set to execute
workers.execute(this::sendWorker); // Asynchronous processing
selectionKey.interestOps(SelectionKey.OP_READ); // Reset to read}}// Business processing after reading the information
private void readWorker(a) {
try {
// Simulate a time-consuming operation
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(String.format("NO %d %s -> Server: %s",
num, socketChannel.getRemoteAddress(),
new String(readBuffer.array())));
} catch (IOException e) {
System.err.println("An exception occurred while processing read services asynchronously! Exception message:" + e.getMessage());
}
status = SEND;
selectionKey.interestOps(SelectionKey.OP_WRITE); // Register write events
this.selector.wakeup(); // Wake up the thread blocked in SELECT
}
private void sendWorker(a) {
try {
sendBuffer.clear();
sendBuffer.put(String.format("NO %d SubReactor recived %s from %s", num,
new String(readBuffer.array()),
socketChannel.getRemoteAddress()).getBytes());
sendBuffer.flip();
// The end of the write method indicates that the write is complete, marking the end of an event
int count = socketChannel.write(sendBuffer);
if (count < 0) {
// In the write scenario, the value -1 also means that the client is disconnected
selectionKey.cancel();
socketChannel.close();
System.out.println(String.format("%d SubReactor send closed", num));
}
// If the connection is not disconnected, switch to read again
status = READ;
} catch (IOException e) {
System.err.println("An exception occurred during asynchronous send processing! Exception message:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e1) {
System.err.println("An exception occurred when the send service was closed! Exception message:"+ e.getMessage()); }}}}Copy the code
AsyncHandler takes care of the next read and write operations.
5, MainSubReactorDemo
public class MainSubReactorDemo {
public static void main(String[] args) throws IOException {
new Thread(new Reactor(2333)).start(); }}Copy the code
The client
1, the Connector
public class Connector implements Runnable {
private final Selector selector;
private final SocketChannel socketChannel;
Connector(SocketChannel socketChannel, Selector selector) {
this.socketChannel = socketChannel;
this.selector = selector;
}
@Override
public void run(a) {
try {
if (socketChannel.finishConnect()) {
// The connection is complete (three handshakes with the server are completed)
System.out.println(String.format("connected to %s", socketChannel.getRemoteAddress()));
// After the connection is established, the Handler will handle the following actions (read/write, etc.)
newHandler(socketChannel, selector); }}catch(IOException e) { e.printStackTrace(); }}}Copy the code
2, the Handler
public class Handler implements Runnable {
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(2048);
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private final static int READ = 0;
private final static int SEND = 1;
private int status = SEND; // In contrast to the server, the default is to start sending data
private AtomicInteger counter = new AtomicInteger();
Handler(SocketChannel socketChannel, Selector selector) throws IOException {
this.socketChannel = socketChannel; // Receive the client connection
this.socketChannel.configureBlocking(false); // Set to non-blocking mode
selectionKey = socketChannel.register(selector, 0); // Register the client with the selector
selectionKey.attach(this); // Attach a Handler object, currently a Handler object
selectionKey.interestOps(SelectionKey.OP_WRITE); // The connection is set up. The next step is to read
selector.wakeup(); // Evoke select blocking
}
@Override
public void run(a) {
try {
switch (status) {
case SEND:
send();
break;
case READ:
read();
break;
default:}}catch (IOException e) {
// If the client is writing/reading data from the server, the client is writing/reading data from the server.
// If the server suddenly disconnects due to network or other reasons, the client should close itself and exit the program
System.err.println("An exception occurred while sending or reading! Exception message:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e2) {
System.err.println("An exception occurred while closing the channel! Exception message:"+ e2.getMessage()); e2.printStackTrace(); }}}void send(a) throws IOException {
if (selectionKey.isValid()) {
sendBuffer.clear();
int count = counter.incrementAndGet();
if (count <= 10) {
sendBuffer.put(String.format("msg is %s", count).getBytes());
sendBuffer.flip(); // Switch to read mode, which allows the channel to read data from the buffer
socketChannel.write(sendBuffer);
// Switch to read again to receive the response from the server
status = READ;
selectionKey.interestOps(SelectionKey.OP_READ);
} else{ selectionKey.cancel(); socketChannel.close(); }}}private void read(a) throws IOException {
if (selectionKey.isValid()) {
readBuffer.clear(); // Switch to buffer write mode, which allows the channel to write its own contents to the buffer
socketChannel.read(readBuffer);
System.out.println(String.format("Server -> Client: %s".new String(readBuffer.array())));
// After receiving the response from the server, continue to send data to the server
status = SEND;
selectionKey.interestOps(SelectionKey.OP_WRITE); // Register write events}}}Copy the code
3, NIOClient
public class NIOClient implements Runnable {
private Selector selector;
private SocketChannel socketChannel;
NIOClient(String ip, int port) {
try {
selector = Selector.open(); // Open a Selector
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false); // Set to non-blocking mode
socketChannel.connect(new InetSocketAddress(ip, port)); // Connect the service
// Entry, the initial events registered to a client channel are connection events
SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_CONNECT);
// Attach handler class, first initializing the connection ready handler class
sk.attach(new Connector(socketChannel, selector));
} catch(IOException e) { e.printStackTrace(); }}@Override
public void run(a) {
try {
while(! Thread.interrupted()) {// Block before the ready event arrives
selector.select();
// Get the ready events obtained by this select
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
// This is where task distribution takes placedispatch((SelectionKey) (it.next())); } selected.clear(); }}catch(IOException e) { e.printStackTrace(); }}void dispatch(SelectionKey k) {
// Attach object to Connector (
Runnable r = (Runnable) (k.attachment());
// Call the previously registered callback object
if(r ! =null) { r.run(); }}}Copy the code
4, ClientDemo
public class ClientDemo {
public static void main(String[] args) {
new Thread(new NIOClient("127.0.0.1".2333)).start();
new Thread(new NIOClient("127.0.0.1".2333)).start(); }}Copy the code
5, test,
Run the above application and client and output the following in the console:
NO 2 SubReactor waitting for register...
NO 1 SubReactor waitting for register...
NO 3 SubReactor waitting for register...
NO 0 SubReactor waitting for register...
accpet /127.0. 01.:63223
NO 0 SubReactor waitting for register...
accpet /127.0. 01.:63226
NO 1 SubReactor waitting for register...
NO 0 /127.0. 01.:63223-> Server: MSG1
NO 1 /127.0. 01.:63226-> Server: MSG1
NO 0 /127.0. 01.:63223-> Server: MSG2
NO 1 /127.0. 01.:63226-> Server: MSG2
NO 0 /127.0. 01.:63223-> Server: MSG3
NO 1 /127.0. 01.:63226-> Server: MSG3
Copy the code
conclusion
The above is a detailed introduction of the Reactor model. I believe that the students have a certain understanding of the Reactor model and a deeper understanding of the Netty architecture. In the next section we dig deeper into the Netty source code.
At the end
I am a code is being hit is still trying to advance. If the article is helpful to you, remember to like, follow yo, thank you!