This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
The concept of Reactor network programming model and three Reactor network programming models are introduced in detail.
Before this, we introduced four common network IO models in Java: four common Network IO models in Java and a brief introduction to select, poll, epoll functions. Now let’s look at the Reactor network programming model evolved from IO multiplexing.
1 Introduction to Reactor model
The common network programming model is not the most basic four network IO models, because it involves the writing of the underlying code, the big guys on the basis of the basic network IO model to adopt the object-oriented way for further encapsulation. The Reactor, Proactor, Acceptor-Connector and other programming models are easier to understand. Reactor model is the most common network programming model. Well-known frameworks or software such as Netty and Tomcat use Reactor model to achieve high concurrency and high performance network communication.
The father of concurrent Java programming Doug Lea as early as in the years before the Reactor model has carried on the detailed elaboration: gee.cs.oswego.edu/dl/cpjslide… The Selector in the NIO package is implemented based on the simplest Reactor model, and some use cases are presented in Doug Lea’s article.
The IO multiplexing model has two main advantages: first, the model can simultaneously listen to multiple IO requests in the same thread, the system does not have to create a large number of threads, thus greatly reducing the system overhead. Second, the blocking mode can reduce invalid system calls and reduce the consumption of CPU resources.
The Reactor model is a further object-oriented encapsulation of the IO multiplexing model, which allows users to write application code without considering the details of the underlying network API. A Reactor is a reaction to events: When an IO multiplexer listens and receives an event notification, it assigns it to different processors based on the event type. Therefore, the Reactor model is also called a Dispatcher model, or an event-driven model.
The Reactor model abstracts two important components:
- Reactor is designed to monitor and respond to various IO events, such as ACCEPT, READ, WRITE, etc. When a new event is detected, it sends it to the corresponding Handler for processing.
- Handler, which is used to handle specific events, such as reading data, executing business logic, writing responses, etc.
Up to now, the Reactor model has been developed, including various implementations, including single Reactor single-thread model, single Reactor multi-thread model and multiple Reactor multi-thread model.
2 Single-reactor Single-thread mode
The flow chart for this pattern in Doug Lea’s article is as follows:
The Reactor listens for VARIOUS I/O events and dispatches them to a specific Handler. The Accepter component handles connection establishment events and is considered a special Handler.
The overall process is:
- The server-side Reactor thread object listens for VARIOUS IO events through a circular SELECT call (IO multiplexing) and registers an accepter event handler to the Reactor, which is dedicated to connection establishment events.
- The Reactor listens for an ACCEPT event and dispatches it to an Accepter component. The Accepter establishes a SocketChannel connection with the client using the ACCEPT () method. The READ event that the connection cares about and the corresponding READ event handler are then registered with the Reactor, which listens for READ events for the connection.
- When the Reactor listens for a read or write event for the connection, it sends the event to the corresponding processor for processing. For example, the read handler reads data directly through the read() method of a SocketChannel, and then performs various services. When sending data to a client, the read handler registers the connection’s WRITE event and its corresponding handler. When the channel is writable, Write data through SocketChannel’s Wtite () method.
- Each time the Reactor thread processes all of the ready I/O events, it blocks again by performing select() and waits for a new event to be ready and dispatch it to the appropriate processor for processing.
Single-reactor single-thread mode means that all operations of the above Reactor and Hander are done in the same thread. The above calls to SELECT, Accept, read, wtite, and so on, as well as business logic processing, are all done in the same thread.
Single Reactor single thread mode is the most basic Reactor model, which is relatively simple to implement. Since it is a single thread, business code writing does not need to consider any concurrency problem. In fact, the Selector base of Java NIO mode is the simplest single Reactor single thread mode.
However, single-reactor single-thread mode only has one thread, which cannot fully utilize the performance of modern multi-core CPU. Moreover, if the business logic of one client takes a long time, the subsequent requests of other clients will be blocked.
Because the business processing of Redis is mainly done in memory, the memory operation speed is very fast, the performance bottleneck of Redis is not on CPU (network IO consumption and memory), and this mode is easy to implement, so the command execution before Redis 6 was also a single-reactor single-thread model.
However, after Redis 6, the introduction of multi-threading mechanism (multi-threading is really sweet), but the multi-threading of Redis is only used in network IO data reading and writing time-consuming operations, reducing the performance loss caused by network IO, but the actual command execution (Handler) is still a single thread sequence. Therefore, there is no need to worry about Redis thread safety.
2.1 pseudo code
The pseudo-code for the single-reactor single-thread pattern presented in Doug Lea’s article is as follows:
Reactor:
/** * Reactor * Listens and distributes events */
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { / / Reactor initialization
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
// The socket is set to non-blocking
serverSocket.configureBlocking(false);
// Register to listen for accept events
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// Register an Acceptor as a callback to the Accept event
sk.attach(new Acceptor());
}
@Override
public void run(a) {
try {
while(! Thread.interrupted()) {// loop through select until the event is ready
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//Reactor is responsible for events received by Dispatchdispatch((SelectionKey) (it.next())); } selected.clear(); }}catch (IOException ex) { / *... * /}}void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
// Call the previously registered callback object
if(r ! =null) { r.run(); }}/** * Acceptor * a callback to accept events */
class Acceptor implements Runnable {
@Override
public void run(a) {
try {
SocketChannel channel = serverSocket.accept();
if(channel ! =null) {
newHandler(selector, channel); }}catch (IOException ex) { / *... * /}}}}Copy the code
Handler:
class Handler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final int MAXIN = 2048;
final int MAXOUT = 2048;
// Allocate buffers
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
// No events are registered by default
// 0 indicates that the channel is not interested in any events, which will result in the channel never being selected
sk = channel.register(selector, 0);
// Use the current Handler object as a callback object when the event is ready
sk.attach(this);
// Register Read ready events
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete(a) {
/ *... * /
return false;
}
boolean outputIsComplete(a) {
/ *... * /
return false;
}
void process(a) {
/ *... * /
return;
}
@Override
public void run(a) {
try {
// Handle read ready events
if (state == READING) {
read();
// Handle write ready events
} else if(state == SENDING) { send(); }}catch (IOException ex) { / *... * /}}/** * handle read ready event */
void read(a) throws IOException {
// Read data
channel.read(input);
if (inputIsComplete()) {
// Process data
process();
state = SENDING;
// Write data
// Start listening for write ready eventssk.interestOps(SelectionKey.OP_WRITE); }}/** * handle write ready event */
void send(a) throws IOException {
channel.write(output);
// Write the SelectionKey
if(outputIsComplete()) { sk.cancel(); }}}Copy the code
3 Single Reactor multithreaded model
In order to overcome the problems that multi-core CPU can not be utilized under single-reactor single-thread model and the I/O block of subsequent requests may be caused by the long time of executing a certain request, the single-reactor multi-thread model is developed.
The flow chart for this pattern in Doug Lea’s article is as follows:
A single Reactor thread listens for VARIOUS I/O events and dispatches to a specific Handler. This is the same as the single-reactor single-thread model, except that a worker thread pool is added. Move non-IO operations (business operations other than read and SEND calls) from the Reactor thread to the worker thread pool for concurrent execution.
The overall process is:
- The server-side Reactor thread object listens for VARIOUS IO events through a circular SELECT call (IO multiplexing) and registers an accepter event handler to the Reactor, which is dedicated to connection establishment events.
- The Reactor listens for an ACCEPT event and dispatches it to an Accepter component. The Accepter establishes a SocketChannel connection with the client using the ACCEPT () method. The READ event that the connection cares about and the corresponding READ event handler are then registered with the Reactor, which listens for READ events for the connection.
- When the Reactor listens for a read or write event for the connection, it sends the event to the corresponding processor for processing. For example, the read handler reads data directly through the read() method of a SocketChannel, and then performs various services. When sending data to a client, the read handler registers the connection’s WRITE event and its corresponding handler. When the channel is writable, Write data through SocketChannel’s Wtite () method.
- The difference between this model and the single-reactor single-thread model is that the Reactor thread is only responsible for network IO calls in Hander, namely read reads data and SEND sends data calls. After data is read, such as deserialization, business logic execution, serialization and other operations are carried out in parallel through a thread pool.
- Each time the Reactor thread processes all of the ready I/O events, it blocks again by performing select() and waits for a new event to be ready and dispatch it to the appropriate processor for processing.
In this mode, all business logic except read and SEND calls are executed by multithreading during Handler processing, which makes the Reactor thread perform the next select operation faster and improves the IO response speed to the request. Do not delay the processing of subsequent IO requests because of some time-consuming business logic.
This mode can make full use of the performance of multi-core CPU, but it will bring the problem of multi-thread concurrency, and the writing of business logic needs to pay special attention to the processing of shared data.
In addition, although asynchronous execution is used for business processing in this mode, the efficiency is improved, but a single Reactor thread is still used to monitor all events and basic IO operations, such as Accept, read, send and connect. In the face of high concurrency scenarios with hundreds or thousands of connections coming in an instant, Still a performance bottleneck.
3.1 pseudo code
The pseudo-code for the single-reactor multithreaded model presented in Doug Lea’s article is as follows:
The Reactor class did not change much. In acceptors, new Handler was changed to new MthreadHandler:
class MthreadHandler implements Runnable {
final SocketChannel channel;
final SelectionKey selectionKey;
final int MAXIN = 2048;
final int MAXOUT = 2048;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
/** * set a static thread pool */
static ExecutorService pool = Executors.newFixedThreadPool(2);
static final int PROCESSING = 3;
MthreadHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
selectionKey = channel.register(selector, 0);
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
boolean inputIsComplete(a) {
/ *... * /
return false;
}
boolean outputIsComplete(a) {
/ *... * /
return false;
}
@Override
public void run(a) {
try {
if (state == READING) {
read();
} else if(state == SENDING) { send(); }}catch (IOException ex) { / *... * /}}synchronized void read(a) throws IOException {
// Accept data
channel.read(input);
if (inputIsComplete()) {
state = PROCESSING;
/* * Processes data asynchronously using threads in the thread pool, executing the business logic * * The Reactor thread can return immediately after the call is made, without waiting for the process to complete */
pool.execute(newProcesser()); }}void send(a) throws IOException {
channel.write(output);
if(outputIsComplete()) { selectionKey.cancel(); }}/** * asynchronous task */
class Processer implements Runnable {
@Override
public void run(a) { processAndHandOff(); }}synchronized void processAndHandOff(a) {
// Perform business
process();
state = SENDING;
// Write data
// Start listening for write ready events
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
void process(a) {
/ *... * /
return; }}Copy the code
4 Multiple Reactor Multithreading mode
In order to prevent a single Reactor from becoming a performance bottleneck, we can continue to transform the function of a Reactor into two parts: “connecting to the client” and “communicating with the client”, which are jointly completed by different Reactor instances (multiple Reactor threads). This is the multi-reactor and multi-thread mode. Also known as Reactor master-slave pattern.
The flow chart for this pattern in Doug Lea’s article is as follows:
The mainReactor has its own Selector, which uses select to monitor connection events. When the events are ready, the Acceptor object checks the connection to the client, and then assigns the new connection to a subReactor. The subReactor also has its Selector, which continues to listen for the connection and perform other events, such as read and write ready events, in the subReactor. This splits the Reactor’s work into two parts, which can be executed in separate threads to further improve performance.
The overall process is:
- The server mainReactor thread listens for connection establishment events through a circular SELECT call (IO multiplexing). It also registers an Accepter event handler to the Reactor, which is dedicated to connection establishment events.
- The client initiates a connection request. The Reactor listens for an ACCEPT event and dispatches it to an Accepter component. The Accepter establishes a SocketChannel connection with the client using the ACCEPT () method. This connection is then assigned to a subReactor. The mainReactor thread then returns to continue the next round of SELECT listening.
- The subReactor also has its Selector, which registers the READ event of interest to the connection and the corresponding READ event handler and listens for the READ event of the connection through select.
- When subReactor detects that a read or write event has occurred, it sends the related event to the corresponding processor for processing. For example, the read handler reads data directly through the read() method of a SocketChannel, and then performs various services. When sending data to a client, the read handler registers the connection’s WRITE event and its corresponding handler. When the channel is writable, Write data through SocketChannel’s Wtite () method.
- The subReactor thread is only responsible for the NETWORK IO call in Hander, namely read reads data and SEND sends data. The processing after reading data, such as deserialization, business logic execution, serialization and other operations are carried out in parallel through a thread pool.
- Each time all ready IO events are processed, the subReactor thread blocks again by performing select() to wait for the new event to be ready and dispatch it to the corresponding processor for processing.
In multi-reactor and multi-threading mode, there can be multiple mainReactor and subReactor, each of which has its own Selector and works in an independent thread. In this way, the multi-threading advantage of multi-core CPU is further utilized, so that the Reactor will not easily become a performance bottleneck. Improves the connection speed and I/O reading and writing speed.
However, the multi-reactor and multi-thread model still cannot solve the impact of time-consuming I/O operations on other clients from the root, because one subReactor may correspond to multiple client connections. Therefore, True asynchronous IO can be implemented using the Proactor pattern, a design pattern evolved from the true asynchronous IO model.
Netty and Memcached are both multi-reactor and multi-threaded models. Nginx also uses multiple reactors and processes. In fact, Netty’s multi-reactor and multi-thread model is easier to implement. SubReactor processes READ, write and other I/O operations as well as business execution, that is, Thread Pool is removed. Or SubReactor and Worker threads are in the same thread pool:
- The mainReactor corresponds to the BossGroup thread group configured in Netty and is responsible for establishing client connections. Generally, only one service port is exposed, and the BossGroup thread group usually works on one thread
- SubReactor corresponds to the WorkerGroup configured in Netty. After receiving and establishing the client connection, the BossGroup submits the network socket to the WorkerGroup, and then selects a thread from the WorkerGroup. I/O processing. WorkerGroup A thread group mainly processes I/ OS. Generally, the number of threads is 2 x CPU cores.
Netty supports both single-threaded and multithreaded data bases (Reactor) by default.
4.1 pseudo code
The pseudo-code for the multireactor multithreading pattern presented in Doug Lea’s article is as follows:
class MthreadReactor implements Runnable {
/** * subReactor set. A selector represents a subReactor */
Selector[] selectors = new Selector[2];
int next = 0;
final ServerSocketChannel serverSocket;
/** * mainSelector */
final Selector selector;
MthreadReactor(int port) throws IOException {
selector = Selector.open();
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
// Listen for the accept event
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//Acceptor is used to establish connections
sk.attach(new Acceptor());
}
@Override
public void run(a) {
try {
while(! Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator();while (it.hasNext()) {
//Reactor is responsible for events received by Dispatchdispatch((SelectionKey) (it.next())); } selected.clear(); }}catch (IOException ex) { / *... * /}}void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
// Call the previously registered callback object
if(r ! =null) { r.run(); }}class Acceptor implements Runnable { // ...
@SneakyThrows
@Override
public synchronized void run(a) {
//mainSelector is responsible for accept to establish a connection
try (SocketChannel connection = serverSocket.accept()) {
// After the connection is established, it passes the connection to a subSelector that listens for read and write events
if(connection ! =null) {
new Handler(selectors[next], connection); // Select a subReactor to be responsible for the received connection}}if (++next == selectors.length) {
next = 0; }}}}Copy the code
References:
- Scalable IO in Java (Doug Lea)
- Reactor model details
- You can learn waste Reactor and Proactor from 8 diagrams
If you need to communicate, or the article is wrong, please leave a message directly. In addition, I hope to like, collect, pay attention to, I will continue to update a variety of Java learning blog!