NIO

What is the NIO

NIO is a Non Blocking IO framework. The server implementation pattern is that a thread can process multiple requests (connections). All connection requests sent by the client are registered to the multiplexer selector, and the multiplexer surveys the connection and processes the IO request

Why NIO

In a common network service, if each client maintains a connection to the login server. Then the server will maintain multiple connections to the client to communicate with contnect, read, and write of the client, especially for long link services. How many C ends need to maintain the same IO connection on the S end. This is a big overhead for the server (famous C10K problem)

NIO how to play

NIO server for Reactor mode

public static void main(String[] args) throws IOException {
    ServerSocketChannel socketChannel = ServerSocketChannel.open();
    // Bind the port
    socketChannel.bind(new InetSocketAddress(8080));
    // Declare async
    socketChannel.configureBlocking(false);
    // Declare the multiplexer and create the Epoll file description object under Linux
    Selector selector = Selector.open();
    // Register connection events
    socketChannel.register(selector,SelectionKey.OP_ACCEPT);
    while (true) {// Add the selectionKey that triggers the event to the set using the epoll of the operating system
        selector.select();
        final Set<SelectionKey> selectionKeys = selector.selectedKeys();
        final Iterator<SelectionKey> iterator = selectionKeys.iterator();
        // Traverse the selectionKey that triggers the event. The selectionKey contains the socketChannel
        while (iterator.hasNext()){
            final SelectionKey next = iterator.next();
            // Determine the event type
            if(next.isAcceptable()){
                // If it is a connection event, register the corresponding SocketChannel to read the event
                final ServerSocketChannel channel = (ServerSocketChannel) next.channel();
                final SocketChannel accept = channel.accept();
                accept.configureBlocking(false);
                accept.register(selector,SelectionKey.OP_READ);
            }else if (next.isReadable()){
                // If it is a read event, read it directly
                final SocketChannel channel = (SocketChannel) next.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                final int read = channel.read(byteBuffer);
                System.out.println(new String(byteBuffer.array(),0,read)); }}// Remove the socketChannel from the set to avoid secondary processingiterator.remove(); }}Copy the code

How do you run and test this NIO server?

In Windows, you can run the Telnet command on the cli tool to connect to the server and send data. Telnet localhost 8080 Connect to the server. CTRL +] Opens the send page and sends the message to the server by using the send command

How does NIO accept socket requests and read data?

When the NIO server is started, a Linux epoll/ Windows SELECT instance object is created with Selector Selector = Selector.open(), and then when the Selector.select(), The event and channel are registered in ePoll and wait for the event to occur. If the event occurs, it will be added to SelectionKey from the rdList of ready events inside the operating system. And then I’m going to process it.

What pain points does NIO address in BIO, and what other drawbacks

  • NIO has solved the
    • BIO thread blocking, read/write blocking, thread waiting time is too long
    • Excessive server CPU and thread waste due to too many client connections (C10K)
  • NIO also has the following drawbacks
    • Does not intrinsically solve the C10K problem [if there are 100,000 connections, reading and writing data at the same time, single selector, NIO loop takes a long time]
    • Selectors. Select (), empty polling bug, when the select() method does not get an event, it may not block, directly execute down, and once appear will continue to appear.
    • The code is complex and requires a lot of exception IO handling

Netty

Netty is a high performance asynchronous event driven network application framework based on NIO and designed by Reactor model.

What pain points did Netty address in NIO

  • Fixed the empty polling bug of selselector. Select ()
  • Simplified development, Netty internal has been for disconnection, network intermittent disconnection, heartbeat processing, half packet read and write, network congestion and abnormal flow processing.
  • Netty internally initializes multiple selectors to avoid C10K problems

Netty how to play

Netty server

public class NettyService {
    public static int port = 8080;
    public static void main(String[] args) {
        // Create a selector thread group that accepts the Accept event
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // Create a selector thread group that handles business processing events such as read/write
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // Service processing handler
                        socketChannel.pipeline().addLast(new DefaultChannelInboundHandlerAdapter());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }}}Copy the code

Service processing handler

public class DefaultChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
    protected static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client online:"+ ctx.channel().remoteAddress());
        channels.add(ctx.channel());
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client offline:"+ ctx.channel().remoteAddress());
        channels.remove(ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        String rtn = "Client:"+ ctx.channel().remoteAddress()+":"+ byteBuf.toString(CharsetUtil.UTF_8); channels.forEach(channel -> channel.writeAndFlush(Unpooled.wrappedBuffer(rtn.getBytes(StandardCharsets.UTF_8)))); }}Copy the code

Netty client

public class NettyClient {
    public static void main(String[] args) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()).addLast(new StringDecoder())
                                .addLast(newDefaultClientHandlerAdapter()); }});try {
            ChannelFuture connect = bootstrap.connect("127.0.0.1".8080).sync();
            Channel channel = connect.channel();
            System.out.println("= = = ="+channel.localAddress()+"= = = =");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String next = scanner.next();
                channel.writeAndFlush(next);
            }
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{ workerGroup.shutdownGracefully(); }}}Copy the code

Client service processing handler

public class DefaultClientHandlerAdapter extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Client receives message");
        System.out.println(msg.toString());
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client write message"); }}Copy the code

How to run and test the above Netty code

Run the service segment main method and then the client main method to send the input to the server via the client console

Netty’s threading model

See a picture on the Internet, feel very in place! (Server)

  • According to the source code of new NioEventLoopGroup(), it is not difficult to see that NioEventLoopGroup is actually a thread group object, with an internal thread pool object containing selectors and taskQueues.
  • The bossGroup is responsible for accept events, and the workerGroup is responsible for read and write events
  • NioEventLoop is actually a thread
  • A pipeline is a business pipeline that processes events
  • ChannelHandler is a specific handler class in the business pipeline. For example, in a client application, events are called outbound if they are moving from client to server. That is, the data sent by the client to the server will pass through a series of channelOutboundHandlers in pipeline (ChannelOutboundHandler calls logic from tail to head direction one by one). The opposite is called inbound, and the inbound only calls ChannelInboundHandler logic in the pipeline. (ChannelInboundHandler calls the logic that calls each Handler one by one from head to tail.)

Netty Adhesive package unpacking

As a transport layer protocol, TCP does not understand the specific meaning of upper-layer service data. It divides packets according to the size of the TCP buffer. For example, if the cache size is 10K, and you send 12K data, TCP will split the 12K data into 10K+2K [unpack], send 10K first, wait for the next data, take 8K and 2K of the previous data to form 10K [stick packet] and then send.

How to solve Netty

  1. The length of the message is fixed, and the size of the transmitted message is fixed. For example, 100 bytes are sent each time, and insufficient blanks are filled
  2. Special characters are added at the end of each message, and then split on the receiving end
  3. When sending a message, send the length at the same time. Send the length first and then send the message. The message receiver receives the message length, waits for the next packet to arrive, and obtains the specified length.

Netty provides multiple decoders for subcontracting operations, such as:

  • LineBasedFrameDecoder
  • DelimiterBasedFrameDecoder special separator (subcontracting)
  • FixedLengthFrameDecoder

Netty disconnection automatic reconnection

When the client realizes the connection between the client and the server, add a listener to listen to the connection status. If the connection fails, reconnect the code again:

ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if(! future.isSuccess()) {// Reconnect to the backend thread for execution
            future.channel().eventLoop().schedule(() -> {
                try {
                    connect();
                } catch(Exception e) { e.printStackTrace(); }},3000, TimeUnit.MILLISECONDS); }}});// Listen for channel closures
cf.channel().closeFuture().sync();
Copy the code

How does Netty solve the selselector. Select () empty polling bug

Select (int selectCnt = 0); select (int selectCnt = 0); select (int selectCnt = 0); When selectCnt is greater than the setting threshold, 512 】 【 io.net ty. SelectorAutoRebuildThreshold can set themselves up, Netty will create a selector, Re-register the registered event on the buggy selector to the new selector and close the buggy selector.

Source:

    protected void run(a) {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        // Omit some fetching logic with part of select
                    default:}}catch (IOException e) {
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
				// Omit some irrelevant code
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // This line of code is key!
                    selectCnt = 0; }}catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); }}catch (Error e) {
				// Omit the exception handling code}}}Copy the code

UnexpectedSelectorWakeup method:

private static final int SELECTOR_AUTO_REBUILD_THRESHOLD 
        = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold".512);

private boolean unexpectedSelectorWakeup(int selectCnt) {
    if (Thread.interrupted()) {
        return true;
    }
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        rebuildSelector();
        return true;
    }
    return false;
}
Copy the code