Scalable IO in Java is a classic article by Doug Lea, author of the Java.util.Concurrent package, on analyzing and building Scalable high-performance IO services. The original address: gee.cs.oswego.edu/dl/cpjslide… It mainly talks about the evolution process of network programming. The Reactor model mentioned in this paper is used for reference by NetTY. The code in the book can run after a little completion.

Classic Service Designs traditional Service design mode

Each handler may be started in its own thread. Each handler is handled by a separate thread.

Sample code:

public class Server implements Runnable { private static final int PORT = 8899; private static final int MAX_INPUT = 4096; public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (! Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); // or, single-threaded, or a thread pool } catch (Exception ex) { ex.printStackTrace(); } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { ex.printStackTrace(); } } private byte[] process(byte[] cmd) { String msg = new String(cmd); System.out.println(msg); return ("hello " + msg).getBytes(); }}}Copy the code

In the main method new Thread(new Server()).start(); To access the system, run NC localhost 8899.

Build high-performance scalable IO services

In building high-performance scalable IO services, we want to achieve the following goals:

  1. Gracefully degrade under heavy load connections;
  2. Ability to continuously improve performance as hardware resources increase;
  3. With low latency, high throughput, adjustable quality of service and other features;

Distribution processing is one of the best ways to achieve this goal. The distribution pattern has the following mechanisms:

  1. Break down an entire process into small tasks;
  2. Each task performs an associated action without blocking;
  3. The task is executed only when the execution status is triggered. For example, the read operation is triggered only when there is data.

In general service development, IO events are usually used as triggers of task execution state. In hander processing, IO events are mainly targeted.



The java.nio package implements the above mechanism nicely:

  1. Non-blocking reads and writes
  2. The execution of the I/O event distribution task is sensed

Therefore, the combination of a series of event-driven pattern based design brings rich scalability to the architecture and design of high-performance IO services;

Reactor model

Also known as a Reactor pattern, a Reactor has the following characteristics:

  1. The Reactor model assigns appropriate handlers to RESPOND to IO events, similar to AWT event processing threads.
  2. Each handler performs non-blocking actions, similar to AWT ActionListeners
  3. Manage events by binding handler to events, similar to adding event listeners with AWT addActionListener.

Channels support socket connections that do not block reads and writes; Buffers are used by Selectors to determine channle IO events. SelectionKeys are responsible for the state and binding of I/O events

Examples of server-side design code based on the Reactor pattern:

class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); Sk. Attach (new Acceptor())); Public void run() {try {while (! Thread.interrupted()) {// Loop selector. Select (); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) { SelectionKey next = it.next(); dispatch(next); } select.clear (); } } catch (IOException ex) { ex.printStackTrace(); }} void dispatch(SelectionKey k) {// SelectionKey attach (Runnable); //Reactor attach->Acceptor attach (SelectionKey k) Attach ->Handler Runnable R = (Runnable)(k.atettling ()); // Call SelectionKey if (r! = null) { r.run(); // Inner public void run() {try {SocketChannel c = serverSocket.accept(); if (c ! = null) { new Handler(selector, c); } } catch(IOException ex) { ex.printStackTrace(); } } } } public class Handler implements Runnable { private static final int MAXIN = 4096; private static final int MAXOUT = 4096; final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); // bind Handler to SelectionKey sk.interestops (selectionkey.op_read); // Wake up Reactor select.select (); sel.wakeup(); } boolean inputIsComplete() { return true; } boolean outputIsComplete() { return true; } void process() { input.flip(); System.out.println("receive: " + new String(input.array())); } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { ex.printStackTrace(); } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { String msg = "hello xxx\n"; output.put(msg.getBytes()); output.flip(); socket.write(output); if (outputIsComplete()) { sk.cancel(); } output.clear(); }}Copy the code

Multithreaded mode

public class Handler2 implements Runnable { private static final int MAXIN = 4096; private static final int MAXOUT = 4096; private static ExecutorService executorService = Executors.newFixedThreadPool(3); final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; static final int PROCESSING = 3; int state = READING; Handler2(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); // bind Handler to SelectionKey sk.interestops (selectionkey.op_read); // Wake up Reactor select.select (); sel.wakeup(); } boolean inputIsComplete() { return true; } boolean outputIsComplete() { return true; } void process() { input.flip(); System.out.println("receive: " + new String(input.array())); } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { ex.printStackTrace(); } } synchronized void read() throws IOException { socket.read(input); if (inputIsComplete()) { state = PROCESSING; executorService.execute(new Processer()); } } void send() throws IOException { String msg = "hello xxx\n"; output.put(msg.getBytes()); output.flip(); socket.write(output); if (outputIsComplete()) { sk.cancel(); } output.clear(); } class Processer implements Runnable { public void run() { processAndHandOff(); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interestOps(SelectionKey.OP_WRITE); }}Copy the code