“This is the 28th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

Like again, form a habit 👏👏

Heartbeat Detection Mechanism

Heartbeat is a special data packet periodically sent between a client and a server in a TCP long connection to notify the other that the client is still online and ensure the validity of the TCP connection.

Heartbeat detection mechanism: The client sends PING messages to the server at regular intervals, and the server replies to the PONG message after receiving them. If the client does not receive a PONG response within a certain period of time, the connection is considered disconnected. If the server does not receive a PING request from the client within a certain period of time, the connection is considered disconnected. The connection activity is detected through this back-and-forth pinging mechanism.

In Netty, an IdleStateHandler is provided to detect that a connection is idle. This Handler can trigger an event that detects that no read or write event has occurred on the connection.

Take a look at its constructor:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
Copy the code

Explain the meanings of the following three parameters:

  • EaderIdleTimeSeconds read timeout: When there is no slave in the specified time intervalChannelOne is triggered when data is readREADER_IDLEIdleStateEventEvents.
  • RiterIdleTimeSeconds write timeout: Indicates that no data is written to within the specified intervalChannel, triggers aWRITER_IDLEIdleStateEventEvents.
  • LlIdleTimeSeconds read/write timeout: Is triggered when no read or write operation is performed within a specified period of timeALL_IDLEIdleStateEventEvents.

Note: The default time unit for these three parameters is seconds. If you need to specify other units of time, you can use another constructor:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
Copy the code

To implement the Netty server heartbeat detection mechanism, add the following codes to the ChannelInitializer for the server:

pipeline.addLast(new IdleStateHandler(3.0.0, TimeUnit.SECONDS));
Copy the code

IdleStateHandler (); IdleStateHandler ();

The red-boxed code actually means that the method is passed through without doing any business logic, leaving the channelRead method to the next handler in channelPipe

Let’s look at the channelActive method again:

Here’s an initialize method, the essence of IdleStateHandler, to explore:

This will trigger a Task, ReaderIdleTimeoutTask, this Task run method source:

The first red box subtracts the current time from the last channelRead call. If the result is 6s, it means that the last channelRead call was 6s ago. If you set 5s, the nextDelay is -1, which means that you have timed out. The second red box triggers the next handler’s userEventTriggered method:

If there is no timeout, the userEventTriggered method is not triggered.

Server code

public class HeartBeatServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast("decoder".new StringDecoder());
                            channelPipeline.addLast("encoder".new StringEncoder());
                            channelPipeline.addLast(new IdleStateHandler(3.0.0, TimeUnit.SECONDS));
                            channelPipeline.addLast(newHeartBeatServerHandler()); }}); System.out.println("Netty server start.");
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
            channelFuture.channel().closeFuture().sync();
        }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}}Copy the code

Inheritance SimpleChannelInboundHandler rewrite channelRead method and userEventTriggered method

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
        if ("Heartbeat Packet".equals(s)){
            ctx.channel().writeAndFlush("ok");
        }else{
            System.out.println("Other information processing..."); }}@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        String eventType = null;
        switch (event.state()){
            case READER_IDLE:
                eventType = "Read free";
                readIdleTimes++; // Read idle count increment by 1
                break;
            case WRITER_IDLE:
                eventType = "Write free";
                / / not processing
                break;
            case ALL_IDLE:
                eventType = "Read-write idle";
                / / not processing
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType);
        if (readIdleTimes > 3){
            System.out.println("[server] read idle more than 3 times, close the connection, release more resources");
            ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); }}@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("= = =" + ctx.channel().remoteAddress() + " is active ==="); }}Copy the code

Client code

public class HeartBeatClient {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            channelPipeline.addLast("decoder".new StringDecoder());
                            channelPipeline.addLast("encoder".new StringEncoder());
                            channelPipeline.addLast(newHeartBeatClientHandler()); }}); System.out.println(Netty client start.);
            Channel channel = bootstrap.connect("127.0.0.1".8888).sync().channel();
            String text = "Heartbeat Packet";
            Random random = new Random();
            while(channel.isActive()){
                int num = random.nextInt(8);
                Thread.sleep(num*1000); channel.writeAndFlush(text); }}catch (Exception e){
            e.printStackTrace();
        }finally{ group.shutdownGracefully(); }}static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" + msg);
            if(msg ! =null && msg.equals("idle close")) {
                System.out.println("Server closes connection, client closes."); ctx.channel().closeFuture(); }}}}Copy the code