Threading model
Different thread model, has a great impact on the performance of the program, in order to understand the Netty thread model, we will systematically explain each thread mode, and finally look at the Netty thread model has what advantages.
The current thread models are:
- Traditional blocking I/O service model
- Reactor model
Depending on the number of reactors and the number of resource pool threads that are processed, there are three typical implementations:
- Single Reactor single thread;
- Single Reactor multithreading;
- Reactor is multithreaded.
The Netty threading model is mainly based on the master-slave Reactor multithreading model, and some improvements are made. There are multiple reactors in the master-slave Reactor multithreading model.
Let’s see what the threading model looks like!
Traditional blocking I/O service model
Working principle diagram
Model features
- Use blocking IO mode to get input data.
- Each connection requires a separate thread to complete data input, business sorting, and data return.
Problem analysis
- When the number of concurrent requests is large, a large number of threads are created, consuming a large amount of system resources.
- After the connection is created, if the current thread has no data to read temporarily, the thread will block in the read operation, resulting in a waste of thread resources.
Reactor model
The solutions to the two disadvantages of the traditional blocking I/O service model are as follows:
- Based on the I/O feeding model: multiple connections share a blocking object, and the application only needs to wait on one blocking object, instead of blocking for all connections. When a connection has new data to process, the operating system notifies the application, and the thread returns from the blocked state to begin business processing.
- Threading resources based on thread pools: Instead of creating a thread for each link and assigning business processing tasks to threads after the connection completes, one thread can handle business for multiple connections.
Working principle diagram
I/O reuse combined with thread pool is the basic design idea of Reactor model.
Illustration shows that
- Reactor pattern, an event-driven pattern that simultaneously ships one or more inputs to the service processor.
- The Reactor pattern is also called the Dispatcher pattern because a server-side program processes incoming requests and dispatches them synchronously to the appropriate processing thread.
- The Reactor model uses IO to reuse listening events and distribute them to a thread (process) after receiving the events, which is the key to high concurrency processing of network servers.
Core composition in the Reactor schema
- Reactor: The Reactor runs in a separate thread that listens for and dispatches events to appropriate handlers to respond to IO events. It is like a corporate telephone operator that takes calls from customers and transfers the line to the appropriate contacts.
- Handlers: The actual events that the program performs the I/O events to accomplish, similar to the actual officials in the company that the customer wants to talk to. Reactor responds to I/O events by scheduling appropriate handlers that perform non-blocking actions.
Reactor single-threaded
The model illustration
As you can see from the legend, there are multiple clients in the application program. The Client can initiate requests and connect to the server. The Dispatcher in the Reactor Dispatcher dispatches the request to acceptors for connection. After the connection is successful, it is forwarded to Handler for service operations.
The single-threaded Reactor model is so straightforward that once you know it, you can easily code by legend.
Code sample
Based on the model legend, the following code can be obtained
/** * Reactor single-thread model *@author connor
*/
public class SingleThreadReactor implements Runnable {
/** * defines server channels, selectors, and ports */
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private static final int PORT = 6666;
/** * In the constructor, initialize the attribute **@throws IOException
*/
public SingleThreadReactor(a) throws IOException {
serverSocketChannel = ServerSocketChannel.open();
// Set non-blocking
serverSocketChannel.configureBlocking(false);
// Bind ports
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
selector = Selector.open();
// Register the channel with the selector, listen for the Accept event, and create a new Acceptor object as an attached object
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocketChannel, selector));
}
@Override
public void run(a) {
while (true) {
try {
int select = selector.select();
if (select > 0) {
// Retrieve the selectionKeys of the event that occurred
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
// Use the dispenser to distribute eventsdispatcher(selectionKey); keyIterator.remove(); }}}catch(IOException e) { e.printStackTrace(); }}}/** ** Distribution event **@param selectionKey
*/
private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
public static void main(String[] args) throws IOException {
SingleThreadReactor str = newSingleThreadReactor(); str.run(); }}Copy the code
According to the legend, a Reactor accepts requests from clients and distributes them to acceptors and handlers through a distributor.
/** * receiver **@author connor
*/
public class Acceptor implements Runnable {
// Define channels and selectors
private ServerSocketChannel serverSocketChannel;
private Selector selector;
/** * initializes the attribute ** in the constructor@param serverSocketChannel
* @param selector
*/
public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}
@Override
public void run(a) {
SocketChannel socketChannel = null;
try {
// Accept connection requests
socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// Register the channel with the selector and listen for read events
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
// Create a new Handler object as an attached object
selectionKey.attach(new Handler(socketChannel));
} catch(IOException e) { e.printStackTrace(); }}}Copy the code
public class Handler implements Runnable
{
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel)
{
this.socketChannel = socketChannel;
}
@Override
public void run(a) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("Client:" + socketChannel.getRemoteAddress() + "Send a message:" + new String(byteBuffer.array()));
socketChannel.write(ByteBuffer.wrap(("From the server:" + socketChannel.getLocalAddress() + "Response successful...").getBytes(StandardCharsets.UTF_8))); }}catch(IOException e) { e.printStackTrace(); }}}Copy the code
The test results are:
Strengths and weaknesses of Reactor’s single-threaded model
The advantage of Reactor single-thread model is that the model is simple and there is no multi-thread, so the problem of multi-core competing resources is eliminated.
However, the disadvantages of the single-threaded model are also obvious, that is, a single thread can not take advantage of the multi-core CPU, when the Handler thread processing, will cause blocking, can not take advantage of the NIO one thread truly processing multiple clients.
Reactor multithreaded model
The model illustration
The Reactor that uses multiple threads is that instead of single-threaded processing, the Handler is assigned to a thread pool. Each time something needs to be executed, the pool starts a thread to execute it, and the Handler doesn’t have to block there. Once the Reactor is finished, it writes the result to a channel. Return to the client.
Code sample
The following code is basically the same as the single-threaded model code, with the addition of a Process class.
/** * processor **@author connor
*/
public class Handler implements Runnable {
private ExecutorService executors = Executors.newFixedThreadPool(2);
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run(a) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
executors.execute(newProcess(socketChannel, byteBuffer)); }}Copy the code
/** ** Operation **@author connor
*/
public class Process implements Runnable {
private SocketChannel socketChannel;
private ByteBuffer byteBuffer;
public Process(SocketChannel socketChannel, ByteBuffer byteBuffer) {
this.socketChannel = socketChannel;
this.byteBuffer = byteBuffer;
}
@Override
public void run(a) {
try {
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("Client:" + socketChannel.getRemoteAddress() + "Send a message:" + new String(byteBuffer.array()));
socketChannel.write(ByteBuffer.wrap(("From the server:" + socketChannel.getLocalAddress() + "Response successful...").getBytes(StandardCharsets.UTF_8))); }}catch(IOException e) { e.printStackTrace(); }}}Copy the code
The code results are as follows:
Reactor multithreaded model pros and cons
- Advantages: It makes full use of the processing power of the multi-core CPU
- Disadvantages: Multithreaded data sharing and access is complex, Reactor processes all events monitoring and response, running in a single thread, prone to performance bottlenecks in high concurrency scenarios.
Reactor master-slave model
The model illustration
Reactor
The main threadMainReactor
Object throughselect
Listen for connection events. After receiving the event, passAcceptor
Handling connection events- when
Acceptor
After processing the connection event,MainReactor
Assigns connections toSubReactor
subreactor
Joins the connection to the connection queue for listening and createshandler
Perform various event processing- When something new happens,
subreactor
Will call the correspondinghandler
To deal with handler
throughread
Read the data and distribute it to the nextworker
threadingworker
Thread pool allocation is independentworker
The thread performs business processing and returns resultshandler
After receiving the result of the response, pass againsend
Returns the result toclient
Reactor
There can be multiple main threadsReactor
Child thread, i.eMainRecator
You can associate more than oneSubReactor
Code sample
public class Reactor implements Runnable {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public Reactor(int port) {
try {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(serverSocketChannel));
} catch(IOException e) { e.printStackTrace(); }}@Override
public void run(a) {
try {
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); }}}catch(IOException e) { e.printStackTrace(); }}private void dispatcher(SelectionKey selectionKey) {
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
public static void main(String[] args) {
Reactor reactor = new Reactor(9090); reactor.run(); }}Copy the code
public class Acceptor implements Runnable {
private ServerSocketChannel serverSocketChannel;
private final int CORE = 8;
private int index;
private SubReactor[] subReactors = new SubReactor[CORE];
private Thread[] threads = new Thread[CORE];
private final Selector[] selectors = new Selector[CORE];
public Acceptor(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
for (int i = 0; i < CORE; i++) {
try {
selectors[i] = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
subReactors[i] = new SubReactor(selectors[i]);
threads[i] = newThread(subReactors[i]); threads[i].start(); }}@Override
public void run(a) {
try {
System.out.println("acceptor thread:" + Thread.currentThread().getName());
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("There's a client connecting," + socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
selectors[index].wakeup();
SelectionKey selectionKey = socketChannel.register(selectors[index], SelectionKey.OP_READ);
selectionKey.attach(new WorkHandler(socketChannel));
if (++index == threads.length) {
index = 0; }}catch(Exception e) { e.printStackTrace(); }}}Copy the code
public class SubReactor implements Runnable {
private Selector selector;
public SubReactor(Selector selector) {
this.selector = selector;
}
@Override
public void run(a) {
while (true) {
try {
selector.select();
System.out.println("selector:" + selector.toString() + "thread:" + Thread.currentThread().getName());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatcher(selectionKey); iterator.remove(); }}catch(IOException e) { e.printStackTrace(); }}}private void dispatcher(SelectionKey selectionKey) { Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); }}Copy the code
public class WorkHandler implements Runnable {
private SocketChannel socketChannel;
public WorkHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run(a) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
String message = new String(byteBuffer.array(), StandardCharsets.UTF_8);
System.out.println(socketChannel.getRemoteAddress() + "The message came :" + message);
socketChannel.write(ByteBuffer.wrap("I got your message.".getBytes(StandardCharsets.UTF_8)));
} catch(IOException e) { e.printStackTrace(); }}}Copy the code
Strengths and weaknesses of Reactor master-slave model
Advantages: Simple data interaction between the parent thread and its child thread has clear responsibilities. The parent thread only needs to receive new connections, and the child thread completes subsequent service processing. The main thread only needs to pass the new connection to the child thread, and the child thread does not need to return data.
Disadvantages: As you can see, the master-slave model is very complex and has a lot of code.