Following the previous article: Architecture Design: Inter-system Communication (4) — IO Communication Model and JAVA Practice Part, we continue to explain asynchronous IO

7. Asynchronous I/O

In the previous two articles, we explained blocking synchronous IO, non-blocking synchronous IO, multiplexing IO, and JAVA support for these three IO models. The IO model is supported by the operating system, and these three IO models are synchronous IO, all adopt the “application does not ask me, I will never actively inform” way.

Asynchronous IO follows a subscription-notification model: applications register IO listeners with the operating system and continue to do their own work. When an IO event occurs in the operating system and data is ready, the operating system actively notifies the application program and triggers the corresponding function:

  • Like synchronous IO, asynchronous IO is supported by the operating system. Microsoft Windows provides an asynchronous I/O technology: THE IOCP Completion Port;

  • Since there is no such asynchronous IO technology under Linux, epoll (an implementation of multiplexing IO technology described above) is used to simulate asynchronous IO.

8. JAVA Support (JAVA AIO)

8-1. Brief Analysis of JAVA AIO Framework

  • The same as “Architecture design: Inter-system communication (4) — IO communication model and JAVA practice in the implementation of JAVA NIO framework analysis, here is not the JAVA AIO framework all the implementation of the class draw, just through this structural analysis to tell readers JAVA AIO class design and the relevance of the operating system

  • In this article we have repeatedly stated that the JAVA AIO framework uses Windows IOCP technology for Windows and ePoll multiplexing IO technology for Linux to simulate asynchronous IO. This can be seen from the design of some classes of the JAVA AIO framework. Framework, for example, is responsible for implementing under Windows socket channel concrete class is “sun. Nio. Ch. WindowsAsynchronousSocketChannelImpl”, its reference documentation comments IOCP type is like this:

/**

* Windows implementation of AsynchronousChannelGroup encapsulating an I/O

* completion port.

*/

If you are interested in, of course I can see all the complete code (advice from “Java. Nio. Channels. Spi. AsynchronousChannelProvider” this class look).

  • Special note, please note that the “java.nio.channels.Net workChannel” interface, this interface is also implements the JAVA NIO framework, as shown in the figure below:

8-2. Code examples

Below, we through a code example, to explain the specific use of JAVA AIO framework, first on the code, in view of the key points in the code writing and operation to explain:

package testASocket;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

/**
 * JAVA AIO框架测试。请一定将
 * 《架构设计:系统间通信(4)——IO通信模型和JAVA实践 中篇》看了后再看本篇测试代码。
 * 这样对您理解代码的关键点非常有益。
 * @author yinwenjie
 */
public class SocketServer {

    static {
        BasicConfigurator.configure();
    }

    private static final Object waitObject = new Object();

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        /*
         * 对于使用的线程池技术,我一定要多说几句
         * 1、Executors是线程池生成工具,通过这个工具我们可以很轻松的生成“固定大小的线程池”、“调度池”、“可伸缩线程数量的池”。具体请看API Doc
         * 2、当然您也可以通过ThreadPoolExecutor直接生成池。
         * 3、这个线程池是用来得到操作系统的“IO事件通知”的,不是用来进行“得到IO数据后的业务处理的”。要进行后者的操作,您可以再使用一个池(最好不要混用)
         * 4、您也可以不使用线程池(不推荐),如果决定不使用线程池,直接AsynchronousServerSocketChannel.open()就行了。
         * */
        ExecutorService threadPool = Executors.newFixedThreadPool(20);
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);
        final AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group);

        //设置要监听的端口“0.0.0.0”代表本机所有IP设备
        serverSocket.bind(new InetSocketAddress("0.0.0.0", 83));
        //为AsynchronousServerSocketChannel注册监听,注意只是为AsynchronousServerSocketChannel通道注册监听
        //并不包括为 随后客户端和服务器 socketchannel通道注册的监听
        serverSocket.accept(null, new ServerSocketChannelHandle(serverSocket));

        //等待,以便观察现象(这个和要讲解的原理本身没有任何关系,只是为了保证守护线程不会退出)
        synchronized(waitObject) {
            waitObject.wait();
        }
    }
}

/**
 * 这个处理器类,专门用来响应 ServerSocketChannel 的事件。
 * 还记得我们在《架构设计:系统间通信(4)——IO通信模型和JAVA实践 中篇》中所提到的内容吗?ServerSocketChannel只有一种事件:接受客户端的连接
 * @author yinwenjie
 */
class ServerSocketChannelHandle implements CompletionHandler<AsynchronousSocketChannel, Void> {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(ServerSocketChannelHandle.class);

    private AsynchronousServerSocketChannel serverSocketChannel;

    /**
     * @param serverSocketChannel
     */
    public ServerSocketChannelHandle(AsynchronousServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
    }

    /**
     * 注意,我们分别观察 this、socketChannel、attachment三个对象的id。
     * 来观察不同客户端连接到达时,这三个对象的变化,以说明ServerSocketChannelHandle的监听模式
     */
    @Override
    public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
        ServerSocketChannelHandle.LOGGER.info("completed(AsynchronousSocketChannel result, ByteBuffer attachment)");
        //每次都要重新注册监听(一次注册,一次响应),但是由于“文件状态标示符”是独享的,所以不需要担心有“漏掉的”事件
        this.serverSocketChannel.accept(attachment, this);

        //为这个新的socketChannel注册“read”事件,以便操作系统在收到数据并准备好后,主动通知应用程序
        //在这里,由于我们要将这个客户端多次传输的数据累加起来一起处理,所以我们将一个stringbuffer对象作为一个“附件”依附在这个channel上
        //
        ByteBuffer readBuffer = ByteBuffer.allocate(50);
        socketChannel.read(readBuffer, new StringBuffer(), new SocketChannelReadHandle(socketChannel , readBuffer));
    }

    /* (non-Javadoc)
     * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)
     */
    @Override
    public void failed(Throwable exc, Void attachment) {
        ServerSocketChannelHandle.LOGGER.info("failed(Throwable exc, ByteBuffer attachment)");
    }
}

/**
 * 负责对每一个socketChannel的数据获取事件进行监听。<p>
 * 
 * 重要的说明:一个socketchannel都会有一个独立工作的SocketChannelReadHandle对象(CompletionHandler接口的实现),
 * 其中又都将独享一个“文件状态标示”对象FileDescriptor、
 * 一个独立的由程序员定义的Buffer缓存(这里我们使用的是ByteBuffer)、
 * 所以不用担心在服务器端会出现“窜对象”这种情况,因为JAVA AIO框架已经帮您组织好了。<p>
 * 
 * 但是最重要的,用于生成channel的对象:AsynchronousChannelProvider是单例模式,无论在哪组socketchannel,
 * 对是一个对象引用(但这没关系,因为您不会直接操作这个AsynchronousChannelProvider对象)。
 * @author yinwenjie
 */
class SocketChannelReadHandle implements CompletionHandler<Integer, StringBuffer> {
    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(SocketChannelReadHandle.class);

    private AsynchronousSocketChannel socketChannel;

    /**
     * 专门用于进行这个通道数据缓存操作的ByteBuffer<br>
     * 当然,您也可以作为CompletionHandler的attachment形式传入。<br>
     * 这是,在这段示例代码中,attachment被我们用来记录所有传送过来的Stringbuffer了。
     */
    private ByteBuffer byteBuffer;

    public SocketChannelReadHandle(AsynchronousSocketChannel socketChannel , ByteBuffer byteBuffer) {
        this.socketChannel = socketChannel;
        this.byteBuffer = byteBuffer;
    }

    /* (non-Javadoc)
     * @see java.nio.channels.CompletionHandler#completed(java.lang.Object, java.lang.Object)
     */
    @Override
    public void completed(Integer result, StringBuffer historyContext) {
        //如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了
        if(result == -1) {
            try {
                this.socketChannel.close();
            } catch (IOException e) {
                SocketChannelReadHandle.LOGGER.error(e);
            }
            return;
        }

        SocketChannelReadHandle.LOGGER.info("completed(Integer result, Void attachment) : 然后我们来取出通道中准备好的值");
        /*
         * 实际上,由于我们从Integer result知道了本次channel从操作系统获取数据总长度
         * 所以实际上,我们不需要切换成“读模式”的,但是为了保证编码的规范性,还是建议进行切换。
         * 
         * 另外,无论是JAVA AIO框架还是JAVA NIO框架,都会出现“buffer的总容量”小于“当前从操作系统获取到的总数据量”,
         * 但区别是,JAVA AIO框架中,我们不需要专门考虑处理这样的情况,因为JAVA AIO框架已经帮我们做了处理(做成了多次通知)
         * */
        this.byteBuffer.flip();
        byte[] contexts = new byte[1024];
        this.byteBuffer.get(contexts, 0, result);
        this.byteBuffer.clear();
        try {
            String nowContent = new String(contexts , 0 , result , "UTF-8");
            historyContext.append(nowContent);
            SocketChannelReadHandle.LOGGER.info("================目前的传输结果:" + historyContext);
        } catch (UnsupportedEncodingException e) {
            SocketChannelReadHandle.LOGGER.error(e);
        }

        //如果条件成立,说明还没有接收到“结束标记”
        if(historyContext.indexOf("over") == -1) {
            return;
        }

        //=========================================================================
        //          和上篇文章的代码相同,我们以“over”符号作为客户端完整信息的标记
        //=========================================================================
        SocketChannelReadHandle.LOGGER.info("=======收到完整信息,开始处理业务=========");
        historyContext = new StringBuffer();

        //还要继续监听(一次监听一次通知)
        this.socketChannel.read(this.byteBuffer, historyContext, this);
    }

    /* (non-Javadoc)
     * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)
     */
    @Override
    public void failed(Throwable exc, StringBuffer historyContext) {
        SocketChannelReadHandle.LOGGER.info("=====发现客户端异常关闭,服务器将关闭TCP通道");
        try {
            this.socketChannel.close();
        } catch (IOException e) {
            SocketChannelReadHandle.LOGGER.error(e);
        }
    }
} 
Copy the code

8-2-1. Key points

  • Notice that in the JAVA NIO framework, we talked about an important concept called “selector.” It is responsible for replacing all the registered channels in the application query to the operating system for IO event polling, management of the current registered channel set, locate the channel where the event occurs and other operations; But in the JAVA AIO framework, because the application is not polling, but subscribing and notifying, the “selector” is no longer needed, and instead a channel registers listeners directly to the operating system.

  • JAVA AIO framework, only two network IO channels “AsynchronousServerSocketChannel” (server monitor channels), “AsynchronousSocketChannel” (socket socket channel). But each channel has a separate fileDescriptor, attachment, and is referenced by a separate SocketChannelReadHandle instance. Let’s take a look at their reference structure with the debug operation:

During the test, we started two clients (clients written in any language, blocking or non-blocking, as long as they supported TCP Socket sockets). If you must see how the client is written, you can refer to the client code examples in my article “Architectural Design: Intersystem Communication (3) — IO Communication Model and JAVA Practice 1”), and then we can observe how the server handles the two client channels:

As you can see, on the server side, respectively for the client and the client 1 2 created two WindowsAsynchronousSocketChannelImpl object as follows:

The client 1: WindowsAsynchronousSocketChannelImpl: 760 | FileDescriptor: 762

Client 2: WindowsAsynchronousSocketChannelImpl: 792 | FileDescriptor: 797

Next, we let the two clients send information to the server side and watch what happens on the server side. The following figure shows the processing of messages from client 1 and client 2 on the server side:

The client 1: WindowsAsynchronousSocketChannelImpl: 760 | FileDescriptor: 762 | SocketChannelReadHandle: 803 | HeapByteBuffer: 808

Client 2: WindowsAsynchronousSocketChannelImpl: 792 | FileDescriptor: 797 | SocketChannelReadHandle: 828 | HeapByteBuffer: 833

As you can see, the SocketChannelReadHandle (processor) object used by the server for each client channel is separate, and the SocketChannel object referenced is separate.

  • JAVA NIO and JAVA AIO frameworks, in addition to the operating system implementation is different to remove Selector, other important concepts exist, such as the Channel concept mentioned above, as well as the Buffer caching method used in the demo code. In fact, the JAVA NIO and JAVA AIO frameworks can be viewed as a complete set of implementations of “high concurrency IO processing.”

8-2-2. Improvement is possible

Of course, the above code is sample code, and the goal is to give you a basic understanding of the Use of the JAVA AIO framework. So it has a lot of room for improvement, such as:

  • In a production environment, we need to log “user login information” on this channel. This requirement can then be implemented using the “attachment” feature in the JAVA AIO.

  • Both in this article and above (Architectural Design: Inter-system Communication (4) — IO Communication Model and JAVA Practice Part), we use the “custom text” format to transfer content and examine the “over” keyword. But in a formal production environment, would you use it this way?

  • Obviously not, because it doesn’t compress very well. Either we use JSON because it has a better information structure at the same compression rate. We can also use protobuffer because it has both transmission efficiency and good information structure; You can even use the TLV format, which provides excellent information transfer efficiency (it doesn’t have a single extra byte description). For a description of these formats, see Architecture design: Intersystem Communication (1) — Overview from “Chat” first.

  • Remember that both the JAVA AIO and The JAVA NIO frameworks use thread pools (and you don’t have to), and the principle is to use thread pools only for business processes and immediately terminate thread execution (return the thread pool or kill it). The JAVA AIO framework also has a thread pool for notification handlers. This is because the JAVA AIO framework is based on the subscription-notification model. “subscribe” operations can be done by the main thread, but you can’t require concurrent notification operations in an application to be done by the main thread.

  • The best way to improve, of course, is to use Netty or Mina.

8-3、为什么还有Netty

Some readers may ask why Netty and MINA are the dominant JAVA NIO technologies now that JAVA NIO/JAVA AIO has implemented the underlying support for all major operating systems. The answer is simple: it’s easier to use. Here are a few examples:

  • While the JAVA NIO and JAVA AIO frameworks provide support for multiplexing IO/ asynchronous IO, they do not provide a good encapsulation of the upper “information format”. For example, the former two do not provide encapsulation for information formats such as Protocol Buffer and JSON, but Netty framework provides encapsulation for these data formats (encoding and decoding based on responsibility chain mode).

  • Write a NIO/AIO server application that is reliable, maintainable, and high performance (note their ordering). In addition to the framework itself to be compatible with the implementation of various operating systems. More importantly, it should also handle many upper-level specific services, such as client permissions, information format encapsulation mentioned above, and simple data reading. Each of these Netty frameworks provides response support.

  • The JAVA NIO framework has a poll/epoll bug: Selector doesn’t block on Selector. Select (timeout). Failure to block means that CPU usage becomes 100%. Of course, this bug can only be repeated on the Linux kernel.

  • The problem in the JDK 1.7 version has not been fully resolved: bugs.java.com/bugdatabase… . Although Netty 4.0 is also based on JAVA NIO framework encapsulation (the NioServerSocketChannel class in Netty has been introduced above), Netty has addressed this bug.

  • For other reasons, after using Netty, you can make your own comparisons.

9, after the notice

Through three articles, we introduced the four IO models of the operating system, and explained the JAVA support for these four IO models, and also gave the code explanation. Some readers complained that it was not in-depth enough. For example, the details of typical EPOLL technology were not explained, and the performance comparison of various IO models was not carried out. Don’t panic, I plan to spend the next three or four months talking about “system to system communication technology,” so just as we did in the “load balancing” series, we’ll fill in later. Of course, my technical level is limited, the purpose of writing blog is mainly to share and summarize, so welcome readers to make fun of.

Starting with the next article, we’ll cover one or two articles from the Netty framework (with Version 4.0 of Netty as the basis for our discussion). Then we’ll start with RIM for JAVA and lead from RIM into an introduction to RPC technology.