Source address: github.com/shawntime/s…

BIO (Synchronous blocking I/O mode)

Data comparison is blocked in one thread, and kernel calls to read(), write(), and Accept () all block

disadvantages
  • In IO code, the read operation is a blocking operation. If the connection does not read or write data, the thread will block, wasting resources
  • Too many threads will cause too many threads on the server, resulting in too much pressure
The BIO pattern implements a chat room program
// Chat room program server
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

public class Server {

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(10);

    private List<Socket> sockets;

    private int port;

    private ServerSocket serverSocket;

    private boolean isClosed = false;

    public Server(int port) {
        this.port = port;
        sockets = new ArrayList<>();
    }

    public void start(a) {
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Service started successfully, listening port :" + port);
            while(! isClosed) { Socket socket = serverSocket.accept(); System.out.println(socket.getRemoteSocketAddress() +"Connect online...");
                sockets.add(socket);
                EXECUTOR_SERVICE.submit(newSocketThread(socket)); }}catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(serverSocket ! =null) {
                try {
                    serverSocket.close();
                } catch(IOException e) { e.printStackTrace(); }}for (Socket socket : sockets) {
                if(socket ! =null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

    }

    private class SocketThread implements Runnable {

        private Socket socket;

        public SocketThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run(a) {
            String msg = "Welcome ["
                    + socket.getRemoteSocketAddress() + "】 Enter the chat room! The current chat room has ["
                    + sockets.size() + "】 people";
            sendMsg(msg);
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while((line = reader.readLine()) ! =null) {
                    msg = "【" + socket.getRemoteSocketAddress() + "】:"+ line; sendMsg(msg); }}catch (IOException e) {
                e.printStackTrace();
            } finally {
                if(reader ! =null) {
                    try {
                        reader.close();
                    } catch(IOException e) { e.printStackTrace(); }}}}private void sendMsg(String msg) {
            for (Socket socket : sockets) {
                PrintWriter pw = null;
                try {
                    pw = new PrintWriter(socket.getOutputStream(), true);
                    pw.println(msg);
                    pw.flush();
                } catch(IOException e) { e.printStackTrace(); }}}}public boolean isClosed(a) {
        return isClosed;
    }

    public void setClosed(boolean closed) { isClosed = closed; }}Copy the code
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;

// Chat room program client
public class Client {

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);

    private String serverIp;

    private int serverPort;

    private Socket socket;

    private boolean isClosed;

    public Client(String serverIp, int serverPort) {
        this.serverIp = serverIp;
        this.serverPort = serverPort;
    }

    public void start(a) {
        try {
            socket = new Socket(serverIp, serverPort);
            EXECUTOR_SERVICE.submit(new ClientThread(socket));
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while((line = reader.readLine()) ! =null) { System.out.println(line); }}catch (IOException e) {
                e.printStackTrace();
            } finally {
                if(reader ! =null) {
                    try {
                        reader.close();
                    } catch(IOException e) { e.printStackTrace(); }}}}catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(socket ! =null) {
                try {
                    socket.close();
                } catch(IOException e) { e.printStackTrace(); }}}}private class ClientThread implements Runnable {

        private Socket socket;

        public ClientThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run(a) {
            Scanner scanner = new Scanner(System.in);
            System.out.println("Please leave a message:");
            while(! isClosed) { String line = scanner.nextLine(); PrintWriter pw =null;
                try {
                    pw = new PrintWriter(socket.getOutputStream(), true);
                    pw.println(line);
                    pw.flush();
                } catch(IOException e) { e.printStackTrace(); }}}}public boolean isClosed(a) {
        return isClosed;
    }

    public void setClosed(boolean closed) { isClosed = closed; }}Copy the code

NIO (Synchronous non-blocking IO mode)

The server implementation pattern is that a thread can process multiple requests (connections). All connection requests sent by the client are registered to the multiplexer selector, and the multiplexer surveys the connection and processes the IO request.

NIO implements a chat room
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class NIOServer {

    private int port;

    private ServerSocketChannel serverSocketChannel;

    private Selector selector;

    private volatile boolean isClosed;

    private List<SocketChannel> socketChannels = new ArrayList<>();

    public NIOServer(int port) {
        this.port = port;
    }

    public void start(a) {
        try {
            serverSocketChannel = ServerSocketChannel.open();
            // Bind the port
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            // Set non-blocking
            serverSocketChannel.configureBlocking(false);
            selector = Selector.open();
            // Register the ServerSocketChannel with the selector, and the selector is interested in the client accept connection operation
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while(! isClosed) { System.out.println("Wait for client to link...");
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while(keyIterator.hasNext()) { SelectionKey selectionKey = keyIterator.next(); handler(selectionKey); keyIterator.remove(); }}}catch(IOException e) { e.printStackTrace(); }}private void handler(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isAcceptable()) {
            acceptHandler(selectionKey);
        }
        if(selectionKey.isReadable()) { readHandler(selectionKey); }}private void acceptHandler(SelectionKey selectionKey) throws IOException {
        System.out.println("New client link...");
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
        socketChannels.add(socketChannel);
        String msg = "Welcome ["
                + socketChannel.getRemoteAddress() + "】 Enter the chat room! The current chat room has ["
                + socketChannels.size() + "】 people";
        print(msg);
    }

    private void readHandler(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int len = socketChannel.read(byteBuffer);
        if(len ! = -1) {
            String line = "【" + socketChannel.getRemoteAddress() + "】:" + new String(byteBuffer.array(), 0, len);
            System.out.println(line);
            print(line);
        }
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    private void print(String line) throws IOException {
        for(SocketChannel channel : socketChannels) { ByteBuffer buffer = ByteBuffer.wrap(line.getBytes()); channel.write(buffer); }}public static void main(String[] args) {
        NIOServer server = new NIOServer(9023); server.start(); }}Copy the code
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ExecutorService;

public class NIOClient {

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);

    private String ip;

    private int port;

    private volatile boolean isClosed;

    private SocketChannel socketChannel;

    public NIOClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void start(a) {
        try {
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress(ip, port));
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            EXECUTOR_SERVICE.submit(new ChatThread());

            while(! isClosed) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); handler(selectionKey); }}}catch(IOException e) { e.printStackTrace(); }}private void handler(SelectionKey selectionKey) throws IOException {
        if (selectionKey.isConnectable()) {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            if (socketChannel.isConnectionPending()) {
                socketChannel.finishConnect();
            }
            socketChannel.configureBlocking(false);
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
        }
        if (selectionKey.isReadable()) {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int len = socketChannel.read(byteBuffer);
            if(len ! = -1) {
                String line = new String(byteBuffer.array(), 0, len); System.out.println(line); } selectionKey.interestOps(SelectionKey.OP_READ); }}private class ChatThread implements Runnable {

        @Override
        public void run(a) {
            Scanner scanner = new Scanner(System.in);
            System.out.println("Please leave a message:");
            while(! isClosed) { String line = scanner.nextLine(); ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());try {
                    socketChannel.write(byteBuffer);
                } catch(IOException e) { e.printStackTrace(); }}}}public static void main(String[] args) {
        NIOClient client = new NIOClient("127.0.0.1".9023); client.start(); }}Copy the code

AIO (Asynchronous Non-blocking IO)

Asynchronous I/O is implemented based on event and callback mechanisms, which means that the application operation will return directly after the application operation, and the operating system will notify the corresponding thread to proceed with the subsequent operation when the background processing is complete.

AIO implements chat room function
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer {

    private Charset charset = Charset.forName("utf-8");

    private int port;

    private AsynchronousChannelGroup channelGroup;

    private AsynchronousServerSocketChannel serverSocketChannel;

    private List<AsynchronousSocketChannel> socketChannels;

    private boolean isClosed;

    public AIOServer(int port) {
        this.port = port;
        socketChannels = new ArrayList<>();
    }

    private void start(a) throws IOException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
        serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
        serverSocketChannel.bind(new InetSocketAddress(port));
        System.out.println("Start server, listen on port :"+port);
        serverSocketChannel.accept(null.new AcceptHandler());
        // Block calls to prevent system resources from being consumed
        System.in.read();
    }

    private void print(String line) throws IOException {
        for(AsynchronousSocketChannel channel : socketChannels) { ByteBuffer buffer = send(line); channel.write(buffer); }}private class ClientHandler implements CompletionHandler<Integer.ByteBuffer> {

        private AsynchronousSocketChannel socketChannel;

        public ClientHandler(AsynchronousSocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            buffer.flip();
            try {
                String receive = receive(buffer);
                SocketAddress remoteAddress = socketChannel.getRemoteAddress();
                if ("quit".equalsIgnoreCase(receive)) {
                    System.out.println(remoteAddress + "Offline...");
                    socketChannels.remove(socketChannel);
                    String msg = "【"
                            + remoteAddress + "Quit the chat room! The current chat room has ["
                            + socketChannels.size() + "】 people";
                    print(msg);
                    socketChannel.close();
                    return;
                }
                String line = "【" + remoteAddress + "】:" + receive;
                System.out.println(line);
                print(line);
                buffer.clear();
                socketChannel.read(buffer, buffer,this);
            } catch(IOException e) { e.printStackTrace(); }}@Override
        public void failed(Throwable exc, ByteBuffer buffer) {}}private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel.Object> {

        @Override
        public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
            if (serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null.this);
            }
            if (socketChannel.isOpen()) {
                socketChannels.add(socketChannel);
                try {
                    String msg = "Welcome ["
                            + socketChannel.getRemoteAddress() + "】 Enter the chat room! The current chat room has ["
                            + socketChannels.size() + "】 people";
                    print(msg);
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    ClientHandler clientHandler = new ClientHandler(socketChannel);
                    socketChannel.read(buffer, buffer, clientHandler);
                } catch(IOException e) { e.printStackTrace(); }}}@Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("error"); }}private ByteBuffer send(String msg) {
        return charset.encode(msg);
    }

    private String receive(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);
        return String.valueOf(charBuffer);
    }


    public static void main(String[] args) {
        AIOServer server = new AIOServer(9090);
        try {
            server.start();
        } catch(IOException e) { e.printStackTrace(); }}}Copy the code
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

public class AIOClient {

    private Charset charset = Charset.forName("utf-8");

    private static final ExecutorService EXECUTOR_SERVICE = newFixedThreadPool(1);

    private String ip;

    private int port;

    private volatile boolean isClosed;

    private AsynchronousSocketChannel socketChannel;

    public AIOClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void start(a) {
        try {
            socketChannel = AsynchronousSocketChannel.open();
            socketChannel.connect(new InetSocketAddress(ip, port)).get();
            EXECUTOR_SERVICE.submit(new ChatThread(Thread.currentThread()));
            while(! isClosed) { ByteBuffer buffer = ByteBuffer.allocate(1024);
                int length = socketChannel.read(buffer).get();
                if (length > 0) { buffer.flip(); System.out.println(receive(buffer)); buffer.clear(); }}}catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.out.println("Thread interrupts exit...");
            try {
                socketChannel.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            EXECUTOR_SERVICE.shutdown();
        } catch(ExecutionException e) { e.printStackTrace(); }}private class ChatThread implements Runnable {

        private Thread mainThread;

        public ChatThread(Thread mainThread) {
            this.mainThread = mainThread;
        }

        @Override
        public void run(a) {
            Scanner scanner = new Scanner(System.in);
            System.out.println("Please leave a message:");
            while(! isClosed) { String line = scanner.nextLine(); socketChannel.write(send(line));if ("quit".equalsIgnoreCase(line)) {
                    isClosed = true; mainThread.interrupt(); }}}}private ByteBuffer send(String msg) {
        return charset.encode(msg);
    }

    private String receive(ByteBuffer buffer) {
        CharBuffer charBuffer = charset.decode(buffer);
        return String.valueOf(charBuffer);
    }

    public static void main(String[] args) {
        AIOClient client = new AIOClient("127.0.0.1".9090); client.start(); }}Copy the code