Architecture Design: Inter-System Communication (3) — IO Communication Model and JAVA Practice (1)

4, multiplexing IO model

In the “previous” article, we touched on the problem of using multiple threads to solve high-concurrency scenarios, and this article begins

4-1. Realistic scenarios

Consider this realistic scenario:

In a restaurant with 100 guests at a time, the first thing to do is order. But there is a problem. In order to save labor costs, the restaurant owner has only one lobby waiter with a single menu waiting for customers to serve.

The dumbest (but easiest) way to do this (method A) is to hand one guest the only menu available, no matter how many guests are waiting to order, and then stand over the guest while he or she finishes the order. After recording the guest’s order, hand the order record to the chef in the back room. And then the second guest… Then there was the third guest. Obviously, only a boss who has had a door stuck in his head would set up a service process like this. That’s because the next 80 guests will leave after they wait longer.

Another option (method B) is to hire 99 new waiters and print 99 new menus. Each waiter takes charge of a guest with a menu (it’s not just the waiter, it’s the menu. Guests can’t order without a menu). After the guests order, take notes of the order and hand it to the back chefs (of course, for efficiency, it is better to have 100 back chefs). In this way, every guest will enjoy VIP service. Of course, the guests will not leave, but the human cost is a big one.

Another way (method C) is to improve the way of ordering. When guests arrive at the restaurant, they can apply for a menu. After deciding what you want, call the waiter. The waiter stands next to him and takes down the menu. The process of handing the menu to the chef should also be improved, and not every menu should be handed to the chef in the back after being recorded. The waiter can record multiple menus and hand them to the chef at the same time. In this way, the labor cost for the boss is the lowest; For the guests, although they no longer enjoy VIP service and have to wait for a certain amount, these are acceptable; For the waiter, basically her time is not wasted, basically by the boss pressure rod last drop of oil and water.

If you were the boss, which approach would you adopt?

  • Arrival situation: concurrency. When the situation is not ideal, a waiter a menu, of course, is enough. So different bosses in different occasions, will be flexible choice of waiter and menu configuration.
  • Guest: Client request
  • Order content: actual data sent by the client
  • Boss: Operating system
  • Human cost: System resources
  • Menu: File status descriptor. The operating system has a limit on how many file state descriptors a process can hold at the same time. Checking this limit in Linux, you can (and should) tweak the kernel parameters.
  • Server: Threads used by the operating system kernel for IO operations (kernel threads)
  • Cook: App threads (of course the kitchen is app processes)
  • Order delivery mode: including blocking and non-blocking two.
  • Method A: Blocking/non-blocking synchronous I/O
  • Method B: blocking/non-blocking synchronous IO using threads for processing
  • Method C: blocking/non-blocking multiplexing IO

4-2, typical multiplexing IO implementation

At present, the process multiplexing IO implementation mainly includes four types: SELECT, Poll, epoll, kqueue. Below is a comparison of some of their key features:

IO model

Relative performance

The key idea

The operating system

JAVA Support

select

higher

Reactor

windows/Linux

Support,Reactor pattern (Reactor design pattern). Select is used by default for kernels 2.4 of The Linux operating system. The current support for synchronous IO under Windows is the SELECT model

poll

higher

Reactor

Linux

JAVA NIO framework under Linux, supported using poll prior to Linux Kernels 2.6 kernel releases. Also uses the Reactor schema

epoll

high

Reactor/Proactor

Linux

Linux Kernels 2.6 and later are supported using epoll; Linux Kernels use poll for support prior to the 2.6 kernel release; It is also important to note that Linux uses epoll to simulate asynchronous I/OS because there is no real asynchronous I/O support for THE Windows IOCP technology

kqueue

high

Proactor

Linux

The current version of JAVA does not support this

Multiplexing IO is best suited for “high concurrency” scenarios, where there are at least a thousand simultaneous connection requests ready within a millisecond. In other cases multiplexing IO technology does not play to its advantages. On the other hand, using JAVA NIO to implement functions is more complex than traditional Socket Socket implementation, so in practical applications, we need to choose technologies according to our own business needs.

JAVA support for multiplexing IO

5-1. Important Concept: Channel

Channel, an established channel through which an application and the operating system exchange events and deliver content (note the connection to the operating system). A channel has its own file state descriptor. Therefore, since the content is transferred to the operating system, it means that the application can read data through the channel, and can also write data to the operating system through the channel.

A Channel in the JDK API is described as:

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.

The JAVA NIO framework has its own Channel channels:

  • All channels registered by a Selector must be subclasses of the SelectableChannel class. As shown above

  • ServerSocketChannel: The listening channel of the application server program. Only through this channel can an application register port listeners with the operating system that support “multiplexing IO”. Both UDP and TCP are supported.

  • ScoketChannel: indicates the listening channel of a TCP Socket. A Socket corresponds to a client IP address: port to server IP address: indicates the communication connection of the port.

  • DatagramChannel: monitoring channel of UDP data packets.

5-2. Important concepts: Buffer

Data cache: In the JAVA NIO framework, to ensure data read and write speed per channel, the JAVA NIO framework integrates Buffer support for every channel that needs to support data read and write.

How to understand this sentence? For example, ServerSocketChannel only supports listening on OP_ACCEPT events, so it cannot directly read and write network data. So ServerSocketChannel has no integrated Buffer.

Buffer has two working modes: write mode and read mode. In read mode, the application can only read data from Buffer, not write data. However, in write mode, the application can read, which means that dirty reads can occur. So once you decide to read from Buffer, be sure to change the state of Buffer to read mode.

The diagram below:

  • Position: Cache data block location currently in operation

  • Limit: indicates the maximum position that can be operated in the cache. The read and write state of the cache is formally controlled by this property.

  • Capacity: indicates the maximum capacity of the cache. This capacity is specified when the cache is created. Since the number of channels tends to be large during high concurrency, it is best not to oversize each cache.

    In the following code example of the JAVA NIO framework, we demonstrate the operation of the Buffer Buffer.

5-3. Important concepts: Selector

Selector stands for “Selector,” but you can call it a “polling agent,” an “event subscriber,” or a “channel container manager,” depending on the job responsibilities of the Selector we’ve described in detail.

  • Event subscription and Channel management: The application registers with the Selector object which channels it needs to focus on, and which IO events a particular Channel is interested in. A container of registered channels is also maintained in Selector. The following code comes from the WindowsSelectorImpl implementation class for the management container of registered channels:

    // Initial capacity of the poll array private final int INIT_CAP = 8; // Maximum number of sockets for select(). // Should be INIT_CAP times a power of 2 private final static int MAX_SELECTABLE_FDS = 1024;

    // The list of SelectableChannels serviced by this Selector. Every mod // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll // array, where the corresponding entry is occupied by the wakeupSocket private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 12345678910

  • Polling agent: The application layer no longer asks the operating system whether an event has occurred directly in blocking mode or non-blocking mode, but instead asks for Selector on its behalf.

  • Implementation of different operating system support: As mentioned earlier, multiplexing IO technology requires the support of the operating system. Its characteristic is that the operating system can simultaneously scan the time of different network connections on the same port. So as an upper-layer JVM, you have to write different code for the multiplexing IO implementations of different operating systems. I use the same test environment is Windows, its corresponding implementation class is the sun. The nio. Ch. WindowsSelectorImpl:

5-4. Brief design and analysis of JAVA NIO framework

Through the above description, we know that multiplexing IO technology is the kernel implementation of the operating system. The multiplexing IO technology implemented in different operating systems and even the versions of the same series of operating systems are different. So how does a cross-platform JAVA JVM fit into a wide variety of multiplexing IO implementations? The power of object orientation comes into play: no matter which implementation is used, they will have “selector”, “channel”, and “cache” operational elements, so it is possible to create a unified set of abstractions for different multiplexing IO technologies and implement them for different operating systems. JAVA NIO in support for various multiplexing IO, mainly is based on JAVA NIO. Channels. Spi. SelectorProvider abstract classes, some of the major abstract methods include:

  • Public Abstract DatagramChannel openDatagramChannel() : creates a UDP channel implementation that matches the operating system.

  • Public Abstract AbstractSelector openSelector() : Create a NIO selector that matches the operating system. As mentioned above, different operating systems and different versions support different NIO models by default.

  • Public Abstract ServerSocketChannel openServerSocketChannel() : Creates a server-side channel that matches the NIO model.

  • Public abstract SocketChannel openSocketChannel() : Creates a TCP Socket channel that matches the NIO model (to reflect the client’s TCP connection)

Because the overall design of the JAVA NIO framework is large, we can only restore some of our concerns. Here we take the JAVA NIO framework for different multiplexing IO technology selector instantiation as an example to view the global point:

Obviously, different SelectorProvider implementations correspond to different selectors. Created by the concrete SelectorProvider implementation. As an additional note, netty’s underlying NIO model is actually derived from this design, which we’ll cover later when we talk about Netty. The following code snippet is the core code for instantiation of NioServerSocketChannel in Netty 4.0:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }1234567891011121314
Copy the code

5-5. JAVA examples

Here, we use the JAVA NIO framework to implement a server side that supports multiplexing IO (in fact, whether the client side uses multiplexing IO technology is not relevant to the overall system architecture performance improvement) :

package testNSocket;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer1 {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open();
        //注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        try {
            while(true) {
                //如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
                //java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
                if(selector.select(100) == 0) {
                    //================================================
                    //      这里视业务情况,可以做一些然并卵的事情
                    //================================================
                    continue;
                }
                //这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
                Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

                while(selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next();
                    //这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
                    //待到下一次selector.select() > 0时,这个readyKey又会被处理一次
                    selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel();
                    if(readyKey.isValid() && readyKey.isAcceptable()) {
                        SocketServer1.LOGGER.info("======channel通道已经准备好=======");
                        /*
                         * 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
                         * 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
                         * 否则无法监听到这个socket channel到达的数据
                         * */
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel , selector);

                    } else if(readyKey.isValid() && readyKey.isConnectable()) {
                        SocketServer1.LOGGER.info("======socket channel 建立连接=======");
                    } else if(readyKey.isValid() && readyKey.isReadable()) {
                        SocketServer1.LOGGER.info("======socket channel 数据准备完成,可以去读==读取=======");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch(Exception e) {
            SocketServer1.LOGGER.error(e.getMessage() , e);
        } finally {
            serverSocket.close();
        }
    }

    /**
     * 在server socket channel接收到/准备好 一个新的 TCP连接后。
     * 就会向程序返回一个新的socketChannel。<br>
     * 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
     * 所以程序还没法通过selector通知这个socket channel的事件。
     * 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
     * socket channel感兴趣的事件
     * @param socketChannel 新的socket channel
     * @param selector selector“选择器/代理器”
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
        socketChannel.configureBlocking(false);
        //socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
        socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(2048));
    }

    /**
     * 这个方法用于读取从客户端传来的信息。
     * 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
     * 如果传输完成,则返回一个true的标记。
     * @param socketChannel
     * @throws Exception
     */
    private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel();
        //获取客户端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort();

        //拿到这个socket channel使用的缓存区,准备读取数据
        //在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
        ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment();
        //将通道的数据写入到缓存区,注意是写入到缓存区。
        //由于之前设置了ByteBuffer的大小为2048 byte,所以可以存在写入不完的情况
        //没关系,我们后面来调整代码。这里我们暂时理解为一次接受可以完成
        int realLen = -1;
        try {
            realLen = clientSocketChannel.read(contextBytes);
        } catch(Exception e) {
            //这里抛出了异常,一般就是客户端因为某种原因终止了。所以关闭channel就行了
            SocketServer1.LOGGER.error(e.getMessage());
            clientSocketChannel.close();
            return;
        }

        //如果缓存区中没有任何数据(但实际上这个不太可能,否则就不会触发OP_READ事件了)
        if(realLen == -1) {
            SocketServer1.LOGGER.warn("====缓存区没有数据?====");
            return;
        }

        //将缓存区从写状态切换为读状态(实际上这个方法是读写模式互切换)。
        //这是java nio框架中的这个socket channel的写请求将全部等待。
        contextBytes.flip();
        //注意中文乱码的问题,我个人喜好是使用URLDecoder/URLEncoder,进行解编码。
        //当然java nio框架本身也提供编解码方式,看个人咯
        byte[] messageBytes = contextBytes.array();
        String messageEncode = new String(messageBytes , "UTF-8");
        String message = URLDecoder.decode(messageEncode, "UTF-8");

        //如果收到了“over”关键字,才会清空buffer,并回发数据;
        //否则不清空缓存,还要还原buffer的“写状态”
        if(message.indexOf("over") != -1) {
            //清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
            contextBytes.clear();
            SocketServer1.LOGGER.info("端口:" + resoucePort + "客户端发来的信息======message : " + message);

            //======================================================
            //          当然接受完成后,可以在这里正式处理业务了        
            //======================================================

            //回发数据,并关闭channel
            ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("回发处理结果", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            SocketServer1.LOGGER.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + message);
            //这是,limit和capacity的值一致,position的位置是realLen的位置
            contextBytes.position(realLen);
            contextBytes.limit(contextBytes.capacity());
        }
    }
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
Copy the code

The comments in the code are fairly clear, but there are a few key points to cover:

  • ServerChannel. Register (Selector SEL, int OPS, Object att) : In fact the register (the Selector sel, int ops, Object att) method is a ServerSocketChannel class parent AbstractSelectableChannel provides a method, Said as long as the inherited AbstractSelectableChannel subclasses of a class can be registered with the selector. By observing the whole AbstractSelectableChannel inheritance relationships, these classes can be registered with the selector in the picture below:

  • Selectionkey. OP_ACCEPT: Different Channel objects can register different “events I care about”. For example, ServerSocketChannel is not allowed to care about events other than OP_ACCEPT times (otherwise the runtime will throw an exception). The following combed the commonly used AbstractSelectableChannel subclasses can register event list:

A channel class

Channel function

Events to watch

ServerSocketChannel

Server side channel

SelectionKey.OP_ACCEPT

DatagramChannel

UDP channel

SelectionKey. OP_READ, SelectionKey. OP_WRITE

SocketChannel

TCP protocol channel

OP_READ, selectionKey. OP_WRITE, selectionKey. OP_CONNECT

Actually by each AbstractSelectableChannel subclass implementation of * * public final int validOps () method, * * can see this passage IO event “” can care about.

  • The selector. SelectedKeys (). The iterator () : When the Selector receives the OS’s IO operation events, its selectedKeys will receive the key descriptors of those events in the next polling operation (different channels, even with the same key, will be stored as two objects). However, every “event keyword” that is processed must be removed, otherwise the event will be processed again in the next poll.

Returns this selector’s select-key set.

Keys may be removed from, but not directly added to, the selected-key set. Any attempt to add an object to the key set will cause an UnsupportedOperationException to be thrown.

The selected-key set is not thread-safe.

5-6. JAVA instance improvement

In the code above, we’ve simplified the use of the cache in order to show you how to use selector. In practical applications, in order to save memory resources, we usually do not allocate that much cache space for a channel. In the following code we have optimized the cache operation:

package testNSocket;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;

import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer2 {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);

    /**
     * 改进的java nio server的代码中,由于buffer的大小设置的比较小。
     * 我们不再把一个client通过socket channel多次传给服务器的信息保存在beff中了(因为根本存不下)<br>
     * 我们使用socketchanel的hashcode作为key(当然您也可以自己确定一个id),信息的stringbuffer作为value,存储到服务器端的一个内存区域MESSAGEHASHCONTEXT。
     * 
     * 如果您不清楚ConcurrentHashMap的作用和工作原理,请自行百度/Google
     */
    private static final ConcurrentMap<Integer, StringBuffer> MESSAGEHASHCONTEXT = new ConcurrentHashMap<Integer , StringBuffer>();

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open();
        //注意、服务器通道只能注册SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        try {
            while(true) {
                //如果条件成立,说明本次询问selector,并没有获取到任何准备好的、感兴趣的事件
                //java程序对多路复用IO的支持也包括了阻塞模式 和非阻塞模式两种。
                if(selector.select(100) == 0) {
                    //================================================
                    //      这里视业务情况,可以做一些然并卵的事情
                    //================================================
                    continue;
                }
                //这里就是本次询问操作系统,所获取到的“所关心的事件”的事件类型(每一个通道都是独立的)
                Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

                while(selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next();
                    //这个已经处理的readyKey一定要移除。如果不移除,就会一直存在在selector.selectedKeys集合中
                    //待到下一次selector.select() > 0时,这个readyKey又会被处理一次
                    selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel();
                    if(readyKey.isValid() && readyKey.isAcceptable()) {
                        SocketServer2.LOGGER.info("======channel通道已经准备好=======");
                        /*
                         * 当server socket channel通道已经准备好,就可以从server socket channel中获取socketchannel了
                         * 拿到socket channel后,要做的事情就是马上到selector注册这个socket channel感兴趣的事情。
                         * 否则无法监听到这个socket channel到达的数据
                         * */
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel , selector);

                    } else if(readyKey.isValid() && readyKey.isConnectable()) {
                        SocketServer2.LOGGER.info("======socket channel 建立连接=======");
                    } else if(readyKey.isValid() && readyKey.isReadable()) {
                        SocketServer2.LOGGER.info("======socket channel 数据准备完成,可以去读==读取=======");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage() , e);
        } finally {
            serverSocket.close();
        }
    }

    /**
     * 在server socket channel接收到/准备好 一个新的 TCP连接后。
     * 就会向程序返回一个新的socketChannel。<br>
     * 但是这个新的socket channel并没有在selector“选择器/代理器”中注册,
     * 所以程序还没法通过selector通知这个socket channel的事件。
     * 于是我们拿到新的socket channel后,要做的第一个事情就是到selector“选择器/代理器”中注册这个
     * socket channel感兴趣的事件
     * @param socketChannel 新的socket channel
     * @param selector selector“选择器/代理器”
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
        socketChannel.configureBlocking(false);
        //socket通道可以且只可以注册三种事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
        //最后一个参数视为 为这个socketchanne分配的缓存区
        socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(50));
    }

    /**
     * 这个方法用于读取从客户端传来的信息。
     * 并且观察从客户端过来的socket channel在经过多次传输后,是否完成传输。
     * 如果传输完成,则返回一个true的标记。
     * @param socketChannel
     * @throws Exception
     */
    private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel();
        //获取客户端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort();

        //拿到这个socket channel使用的缓存区,准备读取数据
        //在后文,将详细讲解缓存区的用法概念,实际上重要的就是三个元素capacity,position和limit。
        ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment();
        //将通道的数据写入到缓存区,注意是写入到缓存区。
        //这次,为了演示buff的使用方式,我们故意缩小了buff的容量大小到50byte,
        //以便演示channel对buff的多次读写操作
        int realLen = 0;
        StringBuffer message = new StringBuffer();
        //这句话的意思是,将目前通道中的数据写入到缓存区
        //最大可写入的数据量就是buff的容量
        while((realLen = clientSocketChannel.read(contextBytes)) != 0) {

            //一定要把buffer切换成“读”模式,否则由于limit = capacity
            //在read没有写满的情况下,就会导致多读
            contextBytes.flip();
            int position = contextBytes.position();
            int capacity = contextBytes.capacity();
            byte[] messageBytes = new byte[capacity];
            contextBytes.get(messageBytes, position, realLen);

            //这种方式也是可以读取数据的,而且不用关心position的位置。
            //因为是目前contextBytes所有的数据全部转出为一个byte数组。
            //使用这种方式时,一定要自己控制好读取的最终位置(realLen很重要)
            //byte[] messageBytes = contextBytes.array();

            //注意中文乱码的问题,我个人喜好是使用URLDecoder/URLEncoder,进行解编码。
            //当然java nio框架本身也提供编解码方式,看个人咯
            String messageEncode = new String(messageBytes , 0 , realLen , "UTF-8");
            message.append(messageEncode);

            //再切换成“写”模式,直接情况缓存的方式,最快捷
            contextBytes.clear();
        }

        //如果发现本次接收的信息中有over关键字,说明信息接收完了
        if(URLDecoder.decode(message.toString(), "UTF-8").indexOf("over") != -1) {
            //则从messageHashContext中,取出之前已经收到的信息,组合成完整的信息
            Integer channelUUID = clientSocketChannel.hashCode();
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端发来的信息======message : " + message);
            StringBuffer completeMessage;
            //清空MESSAGEHASHCONTEXT中的历史记录
            StringBuffer historyMessage = MESSAGEHASHCONTEXT.remove(channelUUID);
            if(historyMessage == null) {
                completeMessage = message;
            } else {
                completeMessage = historyMessage.append(message);
            }
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端发来的完整信息======completeMessage : " + URLDecoder.decode(completeMessage.toString(), "UTF-8"));

            //======================================================
            //          当然接受完成后,可以在这里正式处理业务了        
            //======================================================

            //回发数据,并关闭channel
            ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("回发处理结果", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            //如果没有发现有“over”关键字,说明还没有接受完,则将本次接受到的信息存入messageHashContext
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客户端信息还未接受完,继续接受======message : " + URLDecoder.decode(message.toString(), "UTF-8"));
            //每一个channel对象都是独立的,所以可以使用对象的hash值,作为唯一标示
            Integer channelUUID = clientSocketChannel.hashCode();

            //然后获取这个channel下以前已经达到的message信息
            StringBuffer historyMessage = MESSAGEHASHCONTEXT.get(channelUUID);
            if(historyMessage == null) {
                historyMessage = new StringBuffer();
                MESSAGEHASHCONTEXT.put(channelUUID, historyMessage.append(message));
            }
        }
    }
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
Copy the code

There shouldn’t be much more to explain in the above code. Of course, you can still add thread pooling technology for specific business processing. Note that it must be a thread pool, because this keeps thread size manageable.

6. Advantages and disadvantages of multiplexing IO

  • No more multithreading for IO processing (including the OS kernel IO management module and application processes). Of course, application processes can still introduce thread pooling technology for real business processing

  • The same port can handle multiple protocols. For example, the server port listener tested with ServerSocketChannel can handle both TCP and UDP.

  • Operating system level optimization: Multiplexing IO can be an operating system level technology that can simultaneously accept I/O events from multiple clients on a single port. It has all the features of blocking synchronous IO and non-blocking synchronous IO that we talked about earlier. Part of the Selector function is more like a polling agent.

  • Synchronous IO: Blocking IO, non-blocking IO and even multiplexing IO are all implementations of synchronous IO at the operating system level. We’ve been talking about “synchronous IO” for a long time, but we’ve never gone into detail about what “synchronous IO” means. In fact, it can be stated in one sentence: I will not actively tell the upper system that an event has occurred unless the upper system (including some kind of upper agent mechanism) asks me:

This key concept can be clearly illustrated in the previous “schematic diagrams” of this article, but in order to give a clear summary of the concepts of synchronous IO, asynchronous IO, blocking IO, and non-blocking IO, I will sort out a comparison table after explaining asynchronous IO in the next article.

7. Asynchronous IO (real NIO)

Okay, sorry, once again, I miscalculated the amount of work for the article. The introduction of JAVA Asynchronous IO will have to wait a little longer. In the next article, I’ll go into more detail about asynchronous IO supported by operating systems and introduce the implementation of asynchronous IO in NIO2.0 (AIO), which was added to JAVA version 1.7. As mentioned earlier, there is no Windows IOCP technology in Linux systems, so Linux technologies use epoll multiplexing to simulate asynchronous IO.

About the author: Lu Java source code, a person with a unique pursuit of Coding, Alibaba technical expert, “Programmer’s three courses” co-author, “Java engineers into god” series of articles author.

If you have any comments, suggestions, or want to communicate with the author, you can pay attention to the public number [lu Java source], directly give me a message backstage.