“This is the 28th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.
Like again, form a habit 👏👏
Heartbeat Detection Mechanism
Heartbeat is a special data packet periodically sent between a client and a server in a TCP long connection to notify the other that the client is still online and ensure the validity of the TCP connection.
Heartbeat detection mechanism: The client sends PING messages to the server at regular intervals, and the server replies to the PONG message after receiving them. If the client does not receive a PONG response within a certain period of time, the connection is considered disconnected. If the server does not receive a PING request from the client within a certain period of time, the connection is considered disconnected. The connection activity is detected through this back-and-forth pinging mechanism.
In Netty, an IdleStateHandler is provided to detect that a connection is idle. This Handler can trigger an event that detects that no read or write event has occurred on the connection.
Take a look at its constructor:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
Copy the code
Explain the meanings of the following three parameters:
- EaderIdleTimeSeconds read timeout: When there is no slave in the specified time interval
Channel
One is triggered when data is readREADER_IDLE
的IdleStateEvent
Events. - RiterIdleTimeSeconds write timeout: Indicates that no data is written to within the specified interval
Channel
, triggers aWRITER_IDLE
的IdleStateEvent
Events. - LlIdleTimeSeconds read/write timeout: Is triggered when no read or write operation is performed within a specified period of time
ALL_IDLE
的IdleStateEvent
Events.
Note: The default time unit for these three parameters is seconds. If you need to specify other units of time, you can use another constructor:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
Copy the code
To implement the Netty server heartbeat detection mechanism, add the following codes to the ChannelInitializer for the server:
pipeline.addLast(new IdleStateHandler(3.0.0, TimeUnit.SECONDS));
Copy the code
IdleStateHandler (); IdleStateHandler ();
The red-boxed code actually means that the method is passed through without doing any business logic, leaving the channelRead method to the next handler in channelPipe
Let’s look at the channelActive method again:
Here’s an initialize method, the essence of IdleStateHandler, to explore:
This will trigger a Task, ReaderIdleTimeoutTask, this Task run method source:
The first red box subtracts the current time from the last channelRead call. If the result is 6s, it means that the last channelRead call was 6s ago. If you set 5s, the nextDelay is -1, which means that you have timed out. The second red box triggers the next handler’s userEventTriggered method:
If there is no timeout, the userEventTriggered method is not triggered.
Server code
public class HeartBeatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder".new StringDecoder());
channelPipeline.addLast("encoder".new StringEncoder());
channelPipeline.addLast(new IdleStateHandler(3.0.0, TimeUnit.SECONDS));
channelPipeline.addLast(newHeartBeatServerHandler()); }}); System.out.println("Netty server start.");
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
}finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}}Copy the code
Inheritance SimpleChannelInboundHandler rewrite channelRead method and userEventTriggered method
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)){
ctx.channel().writeAndFlush("ok");
}else{
System.out.println("Other information processing..."); }}@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "Read free";
readIdleTimes++; // Read idle count increment by 1
break;
case WRITER_IDLE:
eventType = "Write free";
/ / not processing
break;
case ALL_IDLE:
eventType = "Read-write idle";
/ / not processing
break;
}
System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType);
if (readIdleTimes > 3){
System.out.println("[server] read idle more than 3 times, close the connection, release more resources");
ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); }}@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("= = =" + ctx.channel().remoteAddress() + " is active ==="); }}Copy the code
Client code
public class HeartBeatClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast("decoder".new StringDecoder());
channelPipeline.addLast("encoder".new StringEncoder());
channelPipeline.addLast(newHeartBeatClientHandler()); }}); System.out.println(Netty client start.);
Channel channel = bootstrap.connect("127.0.0.1".8888).sync().channel();
String text = "Heartbeat Packet";
Random random = new Random();
while(channel.isActive()){
int num = random.nextInt(8);
Thread.sleep(num*1000); channel.writeAndFlush(text); }}catch (Exception e){
e.printStackTrace();
}finally{ group.shutdownGracefully(); }}static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if(msg ! =null && msg.equals("idle close")) {
System.out.println("Server closes connection, client closes."); ctx.channel().closeFuture(); }}}}Copy the code