1, an overview of the

NIO (non-blocking I/O, also known as New I/O in the Java domain) is a synchronous non-blocking I/O model and the basis of I/O multiplexing. It has been increasingly applied to large application servers and has become an effective way to solve the problems of high concurrency, large number of connections and I/O processing.

Core components mainly include:

  • Channel
  • Buffer
  • Selector

Java NIO: Channels and Buffers

While standard IO operates on byte streams and character streams, NIO operates on channels and buffers. Data is always read from a Channel into a Buffer, or written from a Buffer into a Channel.

Java NIO: Asynchronous IO

Java NIO allows you to use IO asynchronously, for example, while a thread is reading data from a channel into a buffer, the thread can still do other things. When data is written to a buffer, the thread can continue processing it. Writing channels from buffers is similar.

Java NIO: Selectors

Java NIO introduced the concept of a selector, which is used to listen for multiple channel events (e.g., connection opening, data arrival). Thus, a single thread can listen on multiple data channels.

2. Basic components

2.1, the Channel

A Channel is a Stream

  • You can go throughChannelRead and write, but only fromStreamTo get data in one direction (read or write)
  • ChannelsIt can be asynchronousreadwrite
  • ChannelIt’s usually based onBufferTo read or write

Some concrete implementation of Channel in Java NIO

  • FileChannel: Obtains data from a file
  • DatagramChannel: Transmits network data through UDP
  • SocketChannel: Transmits network data through TCP
  • ServerSocketChannel: Monitors THE TCP connection for data connection. Each link creates a ServerSocketChannel.

FileChannel Demo

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw"); FileChannel inChannel = aFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = inChannel.read(buf); while (bytesRead ! = -1) { System.out.println("Read " + bytesRead); buf.flip(); while(buf.hasRemaining()){ System.out.print((char) buf.get()); } buf.clear(); bytesRead = inChannel.read(buf); } aFile.close();Copy the code

2.2, Buffer

A Buffer is essentially a block of memory into which you can write data that can then be read again at a later time. This memory block is wrapped in a NIO Buffer object, which provides a set of methods that make it easier to use this part of the data.

There are three main properties in Buffer:

  • Capacity: indicates the Buffer size
  • Position: subscript of the current operation
  • Limit: indicates the data size

Common methods:

  • Flip () : After writing data, willlimitSet it to the data end,positionSet this parameter to 0 for the read operation
  • Rewind () : resetsposition If the value is 0, the read operation is performed

Some concrete implementations of Buffer in Java NIO

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

2.3, the Selector

The advantage of Selector is that multiple channels can be managed using only one thread, switching between threads is expensive for the operating system, and each thread also consumes some resources (memory) in the operating system. Therefore, the fewer threads used, the better.

Note that the channel must be set to non-blocking mode, so the current mode does not apply to FileChannel

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
Copy the code

Four states of SelectionKey

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

3. NIO implements RPC communication

3.1, RpcServer

1. Create ServerSocketChannel listener information

public class RPCServer {
    private Map<String, Object> services = new HashMap<>();
    private Selector selector;
    private ServerSocketChannel ssc;

    public RPCServer(a) {
        try {
            ssc = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(3003);
            ssc.configureBlocking(false);
            ssc.bind(address);
            selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {
            throw newRuntimeException(e); }}}Copy the code

helloService.class

public interface HelloService {

    String sayHello(a);

    void sayHi(String temp);
}

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello(a) {
        return "jiuling";
    }

    @Override
    public void sayHi(String temp) { System.out.println(temp); }}Copy the code

ServerSocketChannel Listens for requests

 public void start(a) {
        System.out.println("----- start listening for request ------");
        try {
            while (selector.select() > 0) {
                for (SelectionKey sk : selector.selectedKeys()) {
                    selector.selectedKeys().remove(sk);
                    if (sk.isAcceptable()) {
						// Use the Accept method to obtain the corresponding SocketChannel
                        SocketChannel sc = ssc.accept();
						// Set to non-blocking mode
                        sc.configureBlocking(false);
                        // Set the channel state to readable
                        sc.register(selector, SelectionKey.OP_READ);
                        sk.interestOps(SelectionKey.OP_ACCEPT);
                    } else if (sk.isReadable()) {
					
                        SocketChannel sc = (SocketChannel) sk.channel();
                        try {
                        	// Call reflection methods
                            remoteHandMethod(sk, sc);
                        } catch (Exception e) {
                            // Remove the specified SelectionKey from the Selector
                            sk.cancel();
                            if(sk.channel() ! =null) {
                                sk.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            throw newRuntimeException(e); }}Copy the code

4, remoteHandMethod reflection call

private void remoteHandMethod(SelectionKey sk, SocketChannel sc) throws Exception {
      //1. Read data from the stream
      ByteBuffer buff = ByteBuffer.allocate(1024);
        sc.read(buff);

        int postion = buff.position();// Get its real size here
        byte[] data = buff.array();
        String message = new String(data, 0, postion);// class/ method name (parameter type: parameter, parameter type: parameter)
        message = message.trim();
	
      // Select * from ();
        String[] clazzInfo = message.split("/");
        String className = clazzInfo[0];
        String methodName = clazzInfo[1].substring(0,
                clazzInfo[1].indexOf("("));
        String temp = clazzInfo[1].substring(
                clazzInfo[1].indexOf("(") + 1,
                clazzInfo[1].indexOf(")"));
		String typeValues = decodeParamsTypeAndValue(temp);
       
       //3
      Object object = services.get(className);
      Class clazz = object.getClass();
      Object result = null;                    
     
     if (typeValues == null) {
            Method method = clazz.getDeclaredMethod(methodName,
                    null);
            result = method.invoke(object, null);
        } else {
            Class[] types = new Class[typeValues.length];
            Object[] values = new Object[typeValues.length];
            for (int i = 0; i < typeValues.length; i++) {
                String[] tv = typeValues[i].split(":");
                String type = tv[0];
                String value = tv[1];
                types[i] = Class.forName(type);
                if (type.contains("Integer") || type.contains("int"))
                    values[i] = Integer.parseInt(value);
                else if (type.contains("Float") || type.contains("float"))
                    values[i] = Float.parseFloat(value);
                else if (type.contains("Double") || type.contains("double"))
                    values[i] = Double.parseDouble(value);
                else if (type.contains("Long") || type.contains("long"))
                    values[i] = Long.parseLong(value);
                else
                    values[i] = value;
            }
            Method method = clazz.getDeclaredMethod(methodName,
                    types);
            result = method.invoke(object, values);
        }
        
        
       //4
        sc.write(ByteBuffer.wrap(result.toString()
                .getBytes()));
        sk.interestOps(SelectionKey.OP_READ);
                                  
}


    // It returns the format of parameter type: parameter value
    private String[] decodeParamsTypeAndValue(String params) {
        if (params == null || params.equals(""))
            return null;
        if (params.indexOf(",") < 0)
            return new String[]{params};
        return params.split(",");


    }

Copy the code

3.2, RpcClient

1. Create a client

public class RPCClient {
    private SocketChannel channel;
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    private static RPCClient client = new RPCClient();
    private Selector selector = null;

	public RPCClient(String serverIp) {
     try {
            System.out.println("------ client is about to start --------");
            selector = Selector.open();
            InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);
// Get the socket channel
            channel = SocketChannel.open(isa);
// Connect to the server
            channel.configureBlocking(false);


            channel.register(selector, SelectionKey.OP_READ);


        } catch (Exception e) {
            throw newRuntimeException(e); }}}Copy the code

2. Get the proxy class

   // Get the proxy
    public Object getRemoteProxy(final Class clazz) {
// Dynamically generate the implementation class

        return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {


            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {

                String methodName = method.getName();
                String clazzName = clazz
                        .getSimpleName();
                Object result = null;
                if (args == null || args.length == 0) {// Indicates the type it passes without arguments
// Interface name/method name ()
                    channel.write(ByteBuffer
                            .wrap((clazzName + "/" + methodName + "()")
                                    .getBytes()));
                } else {
                    int size = args.length;
                    String[] types = new String[size];
                    StringBuffer content = new StringBuffer(clazzName)
                            .append("/").append(methodName).append("(");
                    for (int i = 0; i < size; i++) {
                        types[i] = args[i].getClass().getName();
                        content.append(types[i]).append(":").append(args[i]);
                        if(i ! = size -1)
                            content.append(",");
                    }
                    content.append(")");
                    channel.write(ByteBuffer
                            .wrap(content.toString().getBytes()));
                }
// Get the result
                result = getresult();


                returnresult; }}); }Copy the code

3. Get the returned result

While (SelectionKey () > 0) {for (SelectionKey sk: SelectionKey sk: SelectionKey sk: SelectionKey sk: SelectionKey sk: SelectionKey sk: SelectionKey sk selector.selectedKeys()) { selector.selectedKeys().remove(sk); if (sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); buffer.clear(); sc.read(buffer); int postion = buffer.position(); String result = new String(buffer.array(), 0, postion); result = result.trim(); buffer.clear(); if (result.endsWith("null") || result.endsWith("NULL")) return null; String[] typeValue = result.split(":"); String type = typeValue[0]; String value = typeValue[1]; if (type.contains("Integer") || type.contains("int")) return Integer.parseInt(value); else if (type.contains("Float") || type.contains("float")) return Float.parseFloat(value); else if (type.contains("Long") || type.contains("long")) return Long.parseLong(value); else return value; } } } } catch (Exception e) { throw new RuntimeException(e); } return null; }Copy the code

Complete code can be concerned about the public number to obtain ~