Author: sprinkle_liz www.jianshu.com/p/1a28e48ed…

heartbeat

What is the heart

Heartbeat is a special data packet periodically sent between a client and a server in a TCP long connection to notify the other that the client is still online and ensure the validity of the TCP connection.

Note: Heartbeat packets have another function that is often overlooked: if a connection is not used for a long time, the firewall or router will disconnect the connection.

How to implement

Core Handler — IdleStateHandler

In Netty, the key to implementing the heartbeat mechanism is the IdleStateHandler. How is this Handler used? Take a look at its constructor:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}Copy the code

The following three parameters are explained here:

  • ReaderIdleTimeSeconds: Read timeout. That is, an IdleStateEvent event of READER_IDLE is fired when no data has been read from a Channel within the specified time interval.

  • WriterIdleTimeSeconds: Write timeout. That is, when no data is written to a Channel within the specified time interval, an IdleStateEvent event with WRITER_IDLE is emitted.

  • AllIdleTimeSeconds: Read/write timeout. That is, an ALL_IDLE IdleStateEvent event is emitted when there is no read or write operation within the specified time interval.

Note: The default time unit for these three parameters is seconds. If you need to specify other units of time, you can use another constructor: IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

Before looking at the implementation below, it is recommended to understand the implementation of IdleStateHandler.

Let’s go directly to the code. The points that need attention will be explained in the code through comments.

Use IdleStateHandler to implement the heartbeat

The following uses IdleStateHandler to implement the heartbeat. After connecting to the Server, the Client performs a cyclic task: waits for a few seconds at random and then pings the Server to send a heartbeat packet. When the waiting time exceeds the specified time, the sending fails, assuming that the Server has voluntarily disconnected before this time. The code is as follows:

The Client side

ClientIdleStateTrigger – Heartbeat trigger

The ClientIdleStateTrigger class is also a Handler that overwrites the userEventTriggered method to catch idleState.writer_IDLE events (not sending data to the server at the specified time). Then a heartbeat packet is sent to the Server.

/** * <p> * Is used to catch {@link IdleState#WRITER_IDLE} events (not sending data to the Server within the specified time) and then send a heartbeat packet to the <code>Server</code> end. * </p> */ public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter { public static final String HEART_BEAT = "heart beat!" ; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { // write heartbeat to server ctx.writeAndFlush(HEART_BEAT); } } else { super.userEventTriggered(ctx, evt); }}}Copy the code

Pinger — Heartbeat transmitter

/** * <p> After the client connects to the Server, it performs a cyclic task: waits for a few seconds randomly, and then pings the Server to send a heartbeat packet. </p> */ public class Pinger extends ChannelInboundHandlerAdapter { private Random random = new Random(); private int baseRandom = 8; private Channel channel; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.channel = ctx.channel(); ping(ctx.channel()); } private void ping(Channel channel) { int second = Math.max(1, random.nextInt(baseRandom)); System.out.println("next heart beat will send after " + second + "s."); ScheduledFuture<? > future = channel.eventLoop().schedule(new Runnable() { @Override public void run() { if (channel.isActive()) { System.out.println("sending heart beat to the server..." ); channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT); } else { System.err.println("The connection had broken, cancel the task that will send a heart beat."); channel.closeFuture(); throw new RuntimeException(); } } }, second, TimeUnit.SECONDS); future.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception  { if (future.isSuccess()) { ping(channel); }}}); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// This method is called when a Channel is disconnected and still sends data. ctx.close(); }}Copy the code

ClientHandlersInitializer – the client processor set the initialization of classes

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> { private ReconnectHandler reconnectHandler; private EchoHandler echoHandler; public ClientHandlersInitializer(TcpClient tcpClient) { Assert.notNull(tcpClient, "TcpClient can not be null."); this.reconnectHandler = new ReconnectHandler(tcpClient); this.echoHandler = new EchoHandler(); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new Pinger()); }}Copy the code

Note: The Handler collection above, except for Pinger, is all codecs and resolve sticky packets and can be ignored.

TcpClient – The client for TCP connections

public class TcpClient { private String host; private int port; private Bootstrap bootstrap; /** Save <code>Channel</code>, which can be used to send data in other places other than handler */ private Channel Channel; public TcpClient(String host, int port) { this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000)); } public TcpClient(String host, int port, RetryPolicy retryPolicy) { this.host = host; this.port = port; init(); } /** * request connection to remote TCP server */ public void connect() {synchronized (bootstrap) {ChannelFuture = bootstrap.connect(host, port); this.channel = future.channel(); } } private void init() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap = new bootstrap (); // Bootstrap is reusable and only needs to be initialized during TcpClient instantiation. bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientHandlersInitializer(TcpClient.this)); } public static void main(String[] args) { TcpClient tcpClient = new TcpClient("localhost", 2222); tcpClient.connect(); }}Copy the code

The Server side

ServerIdleStateTrigger – Disconnection trigger

/** * <p> Will take the initiative to disconnect the connection < / p > * / public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter {@ Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); If (state == idlestate.reader_idle) {// Disconnect ctx.disconnect() if (state == idlestate.reader_idle) {// No uplink data is received from the client within a specified period of time. } } else { super.userEventTriggered(ctx, evt); }}}Copy the code

ServerBizHandler – The server-side business handler

/** * <p> Printed directly in the console. < / p > * / @ ChannelHandler. Sharable public class ServerBizHandler extends SimpleChannelInboundHandler < String > { private final String REC_HEART_BEAT = "I had received the heart beat!" ; @Override protected void channelRead0(ChannelHandlerContext ctx, String data) throws Exception { try { System.out.println("receive data: " + data); // ctx.writeAndFlush(REC_HEART_BEAT); } catch (Exception e) { e.printStackTrace(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Established connection with the remote client."); // do something ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Disconnected with the remote client."); // do something ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

ServerHandlerInitializer – The initialization class for the collection of server-side processors

Handler</code></p> */ Public Class ServerHandlerInitializer extends public class ServerHandlerInitializer extends public class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0)); ch.pipeline().addLast("idleStateTrigger", new ServerIdleStateTrigger()); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4)); ch.pipeline().addLast("decoder", new StringDecoder()); ch.pipeline().addLast("encoder", new StringEncoder()); ch.pipeline().addLast("bizHandler", new ServerBizHandler()); }}Copy the code

Note: New IdleStateHandler(5, 0, 0) This handler means that if no packet (including but not limited to heartbeat packets) is received from the client within 5 seconds, the client will be disconnected.

TcpServer – server side

public class TcpServer { private int port; private ServerHandlerInitializer serverHandlerInitializer; public TcpServer(int port) { this.port = port; this.serverHandlerInitializer = new ServerHandlerInitializer(); } public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.serverHandlerInitializer); ChannelFuture Future = bootstrap.bind(port).sync(); // Bind the port to start receiving incoming connections. System.out.println("Server start listen at " + port); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); e.printStackTrace(); } } public static void main(String[] args) throws Exception { int port = 2222; new TcpServer(port).start(); }}Copy the code

At this point, all the code has been written.

test

Start the client first and then the server. After the startup is complete, logs similar to the following are displayed on the console of the client:

Logs generated by the client console

On the server side, you can see that the console outputs logs similar to the following:

Logs generated by the server console

It can be seen that after sending four heartbeat packets, the client found that the connection was disconnected due to the long waiting time of the fifth packet. After receiving four heartbeat packets from the client, the server cannot wait for the next packet and disconnects the connection.

Abnormal situation

During the test, the following situations may occur:

Abnormal situation

The reason for this is that heartbeat packets are sent to the server even though the connection has been disconnected. Although channel.isactive () is used to determine whether the connection is available before the heartbeat packet is sent, it is possible that the connection is available at one moment but is broken at the next moment before the heartbeat packet is sent.

No solution has been found to gracefully handle this situation. If you have a good solution, please kindly advise. Thanks!!

Break line reconnection

Broken line heavy even here but more introduction, I believe you all know how to return a responsibility. I’m just going to give you the general idea, and then I’m going to go straight to the code.

Implementation approach

When the client detects that the connection to the server has been disconnected or failed to connect in the first place, it reconnects using the specified reconnection policy until the connection is re-established or the number of retries runs out.

To check whether the connection is broken, override ChannelInboundHandler#channelInactive, but the connection is not available, this method will be triggered, so just do the reconnection work in this method.

Code implementation

Note: The following code is modified/added to the heartbeat mechanism above.

Because reconnection is the client’s job, you only need to make changes to the client code.

Retry strategy

RetryPolicy — RetryPolicy interface

public interface RetryPolicy { /** * Called when an operation has failed for some reason. This method should return * true to make another attempt. * * @param retryCount the number of times retried so far (0 the first time) * @return true/false */ boolean allowRetry(int retryCount); /** * get sleep time in ms of current retry count. * * @param retryCount current retry count * @return the time to sleep  */ long getSleepTimeMs(int retryCount); }Copy the code

ExponentialBackOffRetry – default implementation of the reconnection policy

/** * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p> */ public class ExponentialBackOffRetry implements RetryPolicy { private static final int MAX_RETRIES_LIMIT = 29; private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE; private final Random random = new Random(); private final long baseSleepTimeMs; private final int maxRetries; private final int maxSleepMs; public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) { this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS); } public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) { this.maxRetries = maxRetries; this.baseSleepTimeMs = baseSleepTimeMs; this.maxSleepMs = maxSleepMs; } @Override public boolean allowRetry(int retryCount) { if (retryCount < maxRetries) { return true; } return false; } @Override public long getSleepTimeMs(int retryCount) { if (retryCount < 0) { throw new IllegalArgumentException("retries count must greater than 0."); } if (retryCount > MAX_RETRIES_LIMIT) { System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT)); retryCount = MAX_RETRIES_LIMIT; } long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount)); if (sleepMs > maxSleepMs) { System.out.println(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs)); sleepMs = maxSleepMs; } return sleepMs; }}Copy the code

ReconnectHandler — Reconnects the processor

@ChannelHandler.Sharable
public class ReconnectHandler extends ChannelInboundHandlerAdapter {

    private int retries = 0;
    private RetryPolicy retryPolicy;

    private TcpClient tcpClient;

    public ReconnectHandler(TcpClient tcpClient) {
        this.tcpClient = tcpClient;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Successfully established a connection to the server.");
        retries = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (retries == 0) {
            System.err.println("Lost the TCP connection with the server.");
            ctx.close();
        }

        boolean allowRetry = getRetryPolicy().allowRetry(retries);
        if (allowRetry) {

            long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);

            System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));

            final EventLoop eventLoop = ctx.channel().eventLoop();
            eventLoop.schedule(() -> {
                System.out.println("Reconnecting ...");
                tcpClient.connect();
            }, sleepTimeMs, TimeUnit.MILLISECONDS);
        }
        ctx.fireChannelInactive();
    }

    private RetryPolicy getRetryPolicy() {
        if (this.retryPolicy == null) {
            this.retryPolicy = tcpClient.getRetryPolicy();
        }
        return this.retryPolicy;
    }
}Copy the code

ClientHandlersInitializer

The reconnection processor, ReconnectHandler, has been added on top of the previous one.

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> { private ReconnectHandler reconnectHandler; private EchoHandler echoHandler; public ClientHandlersInitializer(TcpClient tcpClient) { Assert.notNull(tcpClient, "TcpClient can not be null."); this.reconnectHandler = new ReconnectHandler(tcpClient); this.echoHandler = new EchoHandler(); } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(this.reconnectHandler); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new Pinger()); }}Copy the code

TcpClient

On the basis of the previous add weight, reconnection strategy support.

public class TcpClient { private String host; private int port; private Bootstrap bootstrap; /** reconnection policy */ private RetryPolicy RetryPolicy; /** Save <code>Channel</code>, which can be used to send data in other places other than handler */ private Channel Channel; public TcpClient(String host, int port) { this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000)); } public TcpClient(String host, int port, RetryPolicy retryPolicy) { this.host = host; this.port = port; this.retryPolicy = retryPolicy; init(); } /** * request connection to remote TCP server */ public void connect() {synchronized (bootstrap) {ChannelFuture = bootstrap.connect(host, port); future.addListener(getConnectionListener()); this.channel = future.channel(); } } public RetryPolicy getRetryPolicy() { return retryPolicy; } private void init() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap = new bootstrap (); // Bootstrap is reusable and only needs to be initialized during TcpClient instantiation. bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientHandlersInitializer(TcpClient.this)); } private ChannelFutureListener getConnectionListener() { return new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (! future.isSuccess()) { future.channel().pipeline().fireChannelInactive(); }}}; } public static void main(String[] args) { TcpClient tcpClient = new TcpClient("localhost", 2222); tcpClient.connect(); }}Copy the code

test

Before the test, to avoid the Connection reset by peer exception, you can modify Pinger’s ping() method slightly to add if (second == 5) condition judgment. As follows:

private void ping(Channel channel) { int second = Math.max(1, random.nextInt(baseRandom)); if (second == 5) { second = 6; } System.out.println("next heart beat will send after " + second + "s."); ScheduledFuture<? > future = channel.eventLoop().schedule(new Runnable() { @Override public void run() { if (channel.isActive()) { System.out.println("sending heart beat to the server..." ); channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT); } else { System.err.println("The connection had broken, cancel the task that will send a heart beat."); channel.closeFuture(); throw new RuntimeException(); } } }, second, TimeUnit.SECONDS); future.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception  { if (future.isSuccess()) { ping(channel); }}}); }Copy the code

Start the client

Start the client first and observe the console output. You can see logs similar to the following:

Disconnection reconnection test – client console output

As you can see, when the client finds that it cannot connect to the server, it keeps trying to reconnect. As the number of retries increases, the interval between retries increases but does not want to increase indefinitely. Therefore, you need to set a threshold, such as 60s. As you can see in the figure above, when the next retry takes more than 60 seconds, Sleep Extension is printed too large(*). Pinning to 60000 in ms. This sentence means that the calculated time exceeds the threshold (60s), so reset the actual sleep time to the threshold (60s).

Start the server

Start the server side and continue to observe the client console output.

Disconnection reconnection test – client console output after server startup

Successfully established a connection to the server. The server is Successfully connected to the server. Then because the ping server is still not timed, so there is a disconnection and re-connection cycle.

extension

There may be different reconnection requirements in different environments. For those with different reconnection requirements, you only need to implement the RetryPolicy interface and override the default reconnection policy when creating TcpClient.

Finish!!!!!!

Read more on my blog:

1.Java JVM, Collections, Multithreading, new features series tutorials

2.Spring MVC, Spring Boot, Spring Cloud series tutorials

3.Maven, Git, Eclipse, Intellij IDEA series tools tutorial

4.Java, backend, architecture, Alibaba and other big factory latest interview questions

Life is good. See you tomorrow