Friends, I also want to go out to play!! But the unit is not allowed, it is forbidden to go out of the province.

So… Answer the call, then to review netty related knowledge points.

IO model

Before we learn about Netty, let’s look at the IO model:

IO model means what kind of channel is used to send and receive data. Our JAVA supports three types of network IO programming: BIO, NIO, and AIO

PS: AIO here will not speak, Linux is not too mature to learn what is useless!!

BIO (Blocking IO)

Synchronous blocking model, where each client connection corresponds to one processing thread

Examples of server-side code:

public class SocketServer {

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9000);
        while (true){
            System.out.println("Waiting for connection...");

            Socket accept = serverSocket.accept();
            System.out.println("A client is connected... Ready to read...");
            byte[] bytes = new byte[1024];
            int read = accept.getInputStream().read(bytes);
            System.out.println("At the end of the read...");
            if(read ! = -1){
                System.out.println("The data received from the client is:+new String(bytes, 0, read));
            }
            accept.getOutputStream().write("The treasure! I got your message.".getBytes()); accept.getOutputStream().flush(); }}}Copy the code

Examples of client code:

public class SocketClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1".9000);
        System.out.println("Prepare to send data to server....");
        socket.getOutputStream().write("hello server".getBytes());
        socket.getOutputStream().flush();
        System.out.println("Sending data to the server end....");

        byte[] bytes = new byte[1024];
        int read = socket.getInputStream().read(bytes);
        if(read! = -1){
            System.out.println("Data received from the server is:"+new String(bytes, 0, read)); } socket.close(); }}Copy the code

Disadvantages: Accept (), read(), and write() are all blocking operations, which block threads and waste resources.

Application scenario: The BIO mode applies to the architecture with a small number of fixed connections. This mode has high requirements on server resources, but the program is simple and easy to understand.

NIO (Non Blocking IO)

Synchronous non-blocking model, the server implementation mode is that a thread can process multiple connection requests, the connection requests sent by the client will be registered with the multiplexer selector, the multiplexer polls the connection to have IO requests to process.

Application scenario: NIO is suitable for architectures with a large number of connections and relatively short connections (light operation), such as chat server, bullet screen system, and communication between servers. The programming is complicated

Examples of code for the first NIO release:

public class NioServer {

    private static List<SocketChannel> channelList = new ArrayList<>();

    public static void main(String[] args) throws IOException {
        ServerSocketChannel open = ServerSocketChannel.open();
        open.socket().bind(new InetSocketAddress(9000));
        open.configureBlocking(false);

        System.out.println("Server startup complete...");

        while (true){
            SocketChannel accept = open.accept();
            if(accept! =null){
                System.out.println("New connection...");
                accept.configureBlocking(false);
                channelList.add(accept);
            }

            Iterator<SocketChannel> iterator = channelList.iterator();
            while (iterator.hasNext()){
                SocketChannel socketChannel = iterator.next();
                // Allocate buffer capacity
                ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                int read = socketChannel.read(byteBuffer);
                if (read >0){
                    System.out.println("Received a message :"+new String(byteBuffer.array(), 0, read));
                }else if (read == -1){
                    iterator.remove();
                    System.out.println("Client disconnects");
                }
            }
        }
    }
}
Copy the code

Conclusion: Although do the non-blocking, but if the number of connections is too much, there will be a lot of invalid traversal, if there are 10000 links, of which only 1000 connections have write data, but because of the other 9000 connection didn’t disconnect, every time we’re going to the polling traversal ten thousand times, 90% of the traversal is invalid, it is clearly not a final plan.

NIO advanced code example: that is, the introduction of multiplexer

public class NioSelectorServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.socket().bind(new InetSocketAddress(9000));
        socketChannel.configureBlocking(false);

        // Turn on Selector, which creates epoll
        Selector selector = Selector.open();
        // Register a ServerSocketChannel with a Selector that is interested in the client's Accept connection
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Service startup completed....");

        while (true) {// Block to wait for an event to occur
            selector.select();

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isAcceptable()){
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                    System.out.println("Client connection successful....");
                }else if (key.isReadable()){
                    SocketChannel socketC = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                    int read = socketC.read(byteBuffer);
                    if (read>0){
                        System.out.println("Received message:"+new String(byteBuffer.array()));
                    }else if (read==-1){
                        System.out.println("Client disconnects"); socketC.close(); } } iterator.remove(); }}}}Copy the code

NIO has three core components: channels, buffers, and selectors.

  1. A channel is similar to a stream. Each channel corresponds to a buffer buffer, and the underlying buffer is an array
  2. A channel registers with a selector, which sends it to an idle thread for processing based on the channel’s read or write events
  3. NIO’s buffers and channels are both readable and writable

The flow chart is as follows:

In JDK1.4, NIO is implemented using the Linux kernel function select() or poll(). Similar to NioServer code, selector polls all sockchannels each time to see which channel has read or write events. If not, continue traversal, and JDK1.5 starts with the introduction of epoll based event response mechanisms to optimize NIO.

Conclusion: The whole call process of NIO is that Java calls the kernel function of the operating system to create the Socket, obtains the file descriptor of the Socket, and then creates a Selector object corresponding to the Epoll descriptor of the operating system. Takes on the Socket connection file descriptor event is bound to the Selector corresponding Epoll file descriptor, for asynchronous notifications of events, this is achieved using a thread, and do not need too many invalid traversal, event handling to the operating system kernel interruption program implementation (operating system), greatly improving the efficiency.

Linux kernel functions (SELECT, poll, epoll) are used to implement the I/O multiplexing layer, which Is unknown to Windows because it is not open source.

Why does Netty use NIO instead of AIO?

On Linux systems, the underlying implementation of AIO still uses Epoll, and AIO is not well implemented, so there is no obvious advantage in performance, and it is not easy to optimize deeply because it is encapsulated by JDK. AIO on Linux is not mature enough. Netty is an asynchronous non-blocking framework, and Netty does a lot of asynchronous encapsulation on NIO.

Evolution of Reactor model

Reactor model: event response model.

Single-reactor single-thread model

This is the simplest Reactor model, where the entire process takes place in a single thread:The process is as follows:

  1. The Reactor object listens to the request event of the client through SELECT, and distributes the task through dispatch after receiving the event message.
  2. If it is a connection request, an Acceptor object is sent to handle the connection request, and a Handler object is created to continue processing.
  3. If it is not a connection request, Dispatch will call the Handler corresponding to the connection for processing, and Handle is responsible for the subsequent processing (read operation, write operation, business processing, etc.) after the connection is successful.

This model is simple and easy to understand, but it has some problems. For example, the single-line processor model does not give full play to the performance of the multi-core CPU. If the business processing on the Handler is slow, it means that the whole program cannot handle other connection events, causing performance problems.

This mode is suitable for fast service processing and few client connections.

Single-reactor multithreaded model

Compared with the above model, the business processing module is processed asynchronously, and the flow chart is as follows:The process is as follows:

  1. The Reactor object listens to the request event of the client through SELECT, and distributes the task through dispatch after receiving the event message.
  2. If it is a connection request, an Acceptor object is sent to handle the connection request, and a Handler object is created to continue processing.
  3. If it is not a connection request, Dispatch will call the Handler corresponding to the connection for processing. Handle is responsible for completing the read operation after the connection is successful, and the business processing after the read data is handed over to dispatchThread pools are processed asynchronouslyAfter the service processing is complete, a message is sent to the Handler. The Handler then sends processing response information to the corresponding Client.

This model makes full use of the processing capacity of multi-core CPU and reduces the performance problems caused by business processing. Reactor thread is only responsible for receiving connections and reading and writing operations. However, the Reactor is still responsible for read and write operations in addition to connection processing, and there may still be performance issues with a large number of requests.

Reactor multithreaded model

This model will separate another Reactor object to process other non-connected processes, named as slave Reactor (SubReactor), and the flow chart is as follows:The process is as follows:

  1. The primary Reactor (MainReactor) listens for connection events from the client through select, and submits the connection events to acceptors for processing.
  2. After the Acceptor process is complete,MainReactorAssign this connection toSubReactorProcess, SubReactor adds this connection to the connection queue for event monitoring and establishes Handler for subsequent operations. Consistent with the above model, SubReactor will monitor new events and call Handler for corresponding processing if new events occur.
  3. After the data is read by the Handler, the service processing is processed asynchronously by the thread pool. After the service processing is complete, the Handler sends the message to the Handler, and then the Handler sends the processing response information to the corresponding Client.

In this model, there are two threads processing Reactor events respectively. The main thread only processes connection events and the child thread only processes read and write events. In this way, the main thread can process more connections without worrying about whether the read and write processing in the child thread will affect it. This model is now widely used in various projects, such as Netty

Netty’s core functionality and threading model

Why was Netty born?

  1. NIO class library and API complex, troublesome to use, need to master Selector, ServerSocketChannel, SocketChannel, ByteBuffer and so on.
  2. The workload and difficulty of NIO development are very large: for example, clients face disconnection and reconnection, intermittent network disconnection, heartbeat processing, half-packet reading and writing, network congestion and abnormal flow processing, etc.

Netty has a good encapsulation of the NIO API of the JDK to solve the above problems. Netty has the advantages of high performance, higher throughput, lower latency, and reduced resource consumption.

Netty communication example

Maven dependencies for Netty:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.35. The Final</version>
</dependency>
Copy the code

Server code:

public class NettySerer {

    public static void main(String[] args) {
        // Create two thread groups, bossGroup and workGroup. By default, the number of threads is twice the number of CPU cores
        // bossGroup handles connection requests, and workGroup handles other business requests
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            // Create a server startup object
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)   // Set the parent thread group
                    .channel(NioServerSocketChannel.class)  // use NioServerSocketChannel as the channel implementation for the server
                    // Initialize the server connection queue size. The server processes client connection requests sequentially, so only one client connection can be processed at a time.
                    // When multiple clients arrive at the same time, the server queues the client connection requests that cannot be processed
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // Create channel initialization object and set initialization parameters
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // Set the handler for workerGroup's SocketChannel
                            socketChannel.pipeline().addLast(newNettySererHandler()); }}); System.out.println("netty server start....");

            // Bind a port and synchronize, generate a ChannelFuture asynchronous object, isDone() method can determine the execution of the asynchronous event
            // Start the server (and bind the port). Bind is an asynchronous operation, and sync waits for the asynchronous operation to complete
            ChannelFuture cf = serverBootstrap.bind(9000).sync();

            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()){
                        System.out.println("Listening on port 9000 succeeded"); }}}); cf.channel().closeFuture().sync(); }catch (Exception e){
            e.printStackTrace();
        }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}}/** * Custom handlers need to inherit a specified HandlerAdapter from Netty */
class NettySererHandler extends ChannelInboundHandlerAdapter{
    /** * Read the data sent by the client *@param CTX context *@param Message sent by the MSG client *@throws Exception* /
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("Received message from client:"+buf.toString(CharsetUtil.UTF_8));
    }

    /** * After the data is read *@param ctx
     * @throws Exception* /
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("hello client", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    /** * Exception handling *@param ctx
     * @param cause
     * @throws Exception* /
    @Override
    public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}Copy the code

Client code:

public class NettyClient {

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(newNettyClientHandler()); }}); System.out.println("netty client start");

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1".9000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally{ group.shutdownGracefully(); }}}class NettyClientHandler extends ChannelInboundHandlerAdapter{
    /** * This method is triggered when the client connection to the server is complete@param ctx
     * @throws Exception* /
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    /** * Emitted when a channel has a read event, that is, the server sends data to the client *@param ctx
     * @param msg
     * @throws Exception* /
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("Received message from server:"+ buf.toString(CharsetUtil.UTF_8));
        System.out.println("The server address is:"+ ctx.channel().remoteAddress());
    }

    @Override
    public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Looking at the code, we found that the goal of the Netty framework is to separate your business logic from the network base application code, so that you can focus on business development without having to write a bunch of niO-like network processing operations.

Netty module components

【Bootstrap, ServerBootstrap】 :

Bootstrap means Bootstrap. A Netty application usually starts with a Bootstrap. In Netty, the Bootstrap class is the Bootstrap class for the client program, and the ServerBootstrap class is the Bootstrap class for the server.

【Future, ChannelFuture】 :

In Netty, all I/O operations are asynchronous and it is not immediately clear whether the message was correctly processed. However, you can either wait for it to complete or register a listener directly. This is done with Future and ChannelFutures. They can register a listener and the listener will automatically trigger the registered listener event when the operation succeeds or fails.

[Channel]

Netty A conduit for network communication that can be used to perform network I/O operations. Connections with different protocols and blocking types have different Channel types.

  • NioSocketChannel, asynchronous client TCP Socket connection.

  • NioServerSocketChannel, an asynchronous server TCP Socket connection.

  • NioDatagramChannel, asynchronous UDP connection.

And so on…

“Selector”

Netty implements I/O multiplexing based on a Selector object, through which a thread can listen for Channel events of multiple connections. When a Channel is registered with a Selector, the mechanism inside the Selector automatically and continuously checks whether those registered channels have I/O events ready (such as read, write, network connection completed, etc.). This makes it easy to use a single thread to efficiently manage multiple channels.

【 NioEventLoop 】

NioEventLoop maintains a thread and task queue, which supports asynchronous submission of tasks. When the thread is started, NioEventLoop’s RUN method is called to execute I/O tasks and non-I /O tasks:

I/O tasks, the ready events in selectionKey, such as Accept, Connect, read, write, and so on, are triggered by the processSelectedKeys method.

Non-io tasks, tasks added to the taskQueue, such as register0 and bind0, are triggered by the runAllTasks method.

【 NioEventLoopGroup 】

It manages the eventLoop lifecycle, which can be understood as a thread pool. A group of threads are maintained internally. Each thread (NioEventLoop) is responsible for handling events on multiple channels, and a Channel corresponds to only one thread.

【 ChannelHandler 】

ChannelHandler is an interface that processes I/O events or intercepts I/O operations and forwards them to the next handler in its ChannelPipeline(business processing chain). ChannelHandler itself does not provide many methods, because the interface has a number of methods that need to be implemented, which can be subclassed during use:

  • ChannelInboundHandler Is used to handle inbound I/O events
  • ChannelOutboundHandler Is used to process outbound I/O operations

【 ChannelHandlerContext 】

Saves all context information associated with a Channel, along with a ChannelHandler object.

【 ChannelPipline 】

Saves a List of ChannelHandlers that handle or intercept inbound events and outbound operations for a Channel.

In Netty, each Channel has only one Channel pipeline corresponding to it, and their composition relationship is as follows:

Netty’s threading model

Model explanation:

  1. Netty abstracts out two sets of thread poolsBossGroupandWorkerGroup, BossGroup is responsible for receiving client connections, and WorkerGroup is responsible for reading and writing the network. Both BossGroup and WorkerGroup types are NioEventLoopGroup
  2. NioEventLoopGroupEquivalent to a group of event loop threads that contain multiple event loop threads, each of which is a NioEventLoop
  3. Each NioEventLoop has a selectorTo listen for network traffic on socketchannels registered on it
  4. Each Boss NioEventLoop thread executes three steps in an internal loop
    • Process the accept event, establish a connection with the client, and generate a NioSocketChannel
    • Register NioSocketChannel with a selector on a worker NIOEventLoop
    • Tasks that process the task queue, namely runAllTasks
  5. Each Worker NIOEventLoop thread executes a loop of steps
    • Polls read,write events for all NIoSocketChannels registered to its selector
    • Processes I/O events, that is, read and write events, and processes services on the corresponding NioSocketChannel
    • RunAllTasks process tasks in the TaskQueue. Some time-consuming business processes can be put into the TaskQueue for slow processing, which does not affect the flow of data in the pipeline
  6. When each worker NIOEventLoop processes NioSocketChannel, it uses pipelines, which maintain a number of handler handlers to process data in the channel

ByteBuf

ByteBuf consists of an array of bytes. Each byte in the array holds information. It internally implements two indexes, one for reading data and one for writing data. These indexes move through byte arrays to locate where information needs to be read or written.

When read from ByteBuf, its readerIndex is incremented by the number of bytes read. Similarly, when writing to ByteBuf, its writerIndex is incremented by the number of bytes written.

Note that the limit is when readerIndex reads exactly where writerIndex writes. If readerIndex exceeds writerIndex, Netty will throw itIndexOutOf-BoundsExceptionThe exception.

Netty codec

When you send or receive a message via Netty, a data conversion occurs. The inbound message is decoded (bytes -> another format) and the outbound message is encoded (any format -> bytes).

ChannelPipeline provides a container for the ChannelHandler chain. When the client sends data to the server, the client invokes the logic of each handler from tail to head through the ChannelOutboundHand chain. The server calls the logic of each handler from head to tail through the ChannelInboundHand chain.

Glue bag unpacking

TCP is a stream protocol, just a long string of binary data without boundaries. TCP as transport layer protocol is not don’t understand the specific meaning of the upper business data, it will be according to the actual situation of TCP buffer packet classification, so that in our business is a complete package, may be the TCP split into multiple packages to send, it is also possible to multiple small bag packaging into a large packets to send, This is known as the TCP sticky and unpack problem. Flow – oriented communication is message – free.

As shown in the following figure, the client sends two packets, D1 and D2, but the server may receive data in the following cases.

The solution

  1. Message lengthFor example, the length of each segment is fixed at 100 bytes. If there is not enough space, space will be added
  2. At the end of the packetAdd special delimiters, such as underscore, underscore, etc., this method is easy to use, but when choosing delimiter must be careful that each data must not appear inside the delimiter.
  3. Send the length: Sends each piece of data together with the length of the data. For example, you can select the first four bits of each piece of data to determine the start and end of each piece of data based on the length.

Zero copy of Netty

Zero copy refers to zero copies of heap memory and direct memory

For Netty receiving and sending ByteBuf, Direct buffers are used. Direct buffers are used for Socket reading and writing, and the second copy of byte buffers is not required. If you use traditional JVM heap memory for Socket reads and writes, the JVM copies the heap Buffer into direct memory before writing to the Socket. JVM heap memory data cannot be written directly into sockets. The message is sent with an extra memory copy of the buffer compared to direct out-of-heap memory.

Advantages and disadvantages of using direct memory

Advantages:

  1. Does not occupy heap memory space, reducing the likelihood of GC occurring
  2. On a Java VM, local I/O operations directly on the direct memory (direct memory => System call => Hard disk/NIC), whereas non-direct memory requires a secondary copy (heap memory => Direct memory => System call => Hard disk/NIC).

Disadvantages:

  1. Direct memory initial allocation is slow
  2. Without a JVM to help manage memory directly, memory overruns can occur. To avoid never having a FULL GC, you end up with direct memory running out of physical memory. We can specify the maximum direct memory through- XX: MaxDirectMemorySizeTo specify that, when a threshold is reached, thesystem.gcTo perform a FULL GC to indirectly reclaim unused direct memory.