Netty – based custom protocol communication protocol
1. Definition of communication protocol
field | Number of bytes occupied | describe |
---|---|---|
The frame header | 2 bytes | The value is fixed at 0x55 0xAA |
Long degrees | 2 bytes | Length = command word + argument + checksum, excluding frame header and length bytes |
Life has | 1 bytes | 0 Heartbeat, 1 authentication, 2 Obtaining information |
And the number | 0~65535 bytes | Business data |
The checksum | 2 bytes | Checksum = Frame header + length + command word + sum of bytes of arguments |
Framework functions
-
heartbeat
-
TCP half packet, sticky packet processing
-
IP filtering
-
Log print
-
User-defined protocol resolution
Business description
(1) The Netty protocol stack client sends a handshake request message carrying authentication information.
(2) The Netty protocol stack server verifies the validity of the handshake request message. After the verification succeeds, the Netty protocol stack server returns a handshake reply message indicating that the login succeeds.
(3) After the link is successfully established, the client sends heartbeat messages and the client sends service messages.
(6) The server responds to heartbeat and business messages;
(7) When the server exits, the server closes the connection, and the client senses that the other party closes the connection and passively closes the client connection.
Complete code download address
Code screenshots
Client startup code
package com.king.netty.core.client;
import com.king.netty.core.DataFrameDecoder;
import com.king.netty.core.DataFrameEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/ * * *@author King
* @date2021/7/14 * /
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
startServer();
}
static void startServer(a) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// Set the thread group
.group(group)
// Set to NIO mode
.channel(NioSocketChannel.class)
// Set all channelHandlers in the pipeline
// The inbound channelHandler needs to be ordered
// The outbound channelHandler needs to be in order
.handler(new ClientHandlerInit());
bootstrap.connect("127.0.0.1".8888).sync();
}
static class ClientHandlerInit extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Logs are printed
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
/ / LengthFieldBasedFrameDecoder used to solve the problem of TCP stick pack half a pack
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535.// maxFrameLength Maximum length of a message
2.// lengthFieldOffset specifies the offset of the length field, indicating that the length field is skipped after a specified number of bytes
2.// lengthFieldLength Specifies the field that records the length of the frame data, i.e. the length of the length field itself
0.// lengthAdjustment a modified value of length, which can be positive or negative. After reading the length value N of the packet, Netty considers that the following N bytes need to be read, but depending on the actual situation, it may need to increase or decrease the N value. How much you increase, how much you decrease, you write it in this parameter
2 / / initialBytesToStrip from skip the number of bytes in the data frame, after getting a complete packet said, throw away how many bytes in the packet, is the subsequent actual need of business data.
));
// Custom protocol decoder
pipeline.addLast(new DataFrameDecoder());
// Custom protocol encoder
pipeline.addLast(new DataFrameEncoder());
// The handler that handles authentication requests
pipeline.addLast(new AuthorizationRequestHandler());
// The heartbeat handler
pipeline.addLast(new HeartBeatRequestHandler());
// Client business handler
pipeline.addLast(newClientBusinessHandler()); }}}Copy the code
The client requests authentication Handler code
package com.king.netty.core.client;
import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/ * * *@author King
* @date2021/7/14 * /
public class AuthorizationRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Initiate an authentication request after the connection is successful
ctx.writeAndFlush(DataFrame.getAuthorizationDataFrame());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
// Process the authentication response
if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
byte[] params = dataFrame.getParams();
if (! "success".equals(new String(params))){
// Failed to authenticate and closed the connectionReferenceCountUtil.release(msg); ctx.close(); }}// The authentication succeeds, and the message continues
// Non-authenticated responses are forwarded to subsequent servicesctx.fireChannelRead(msg); }}Copy the code
Client heartbeat Handler code
package com.king.netty.core.client;
import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.TimeUnit;
/ * * *@author King
* @date2021/7/14 * /
public class HeartBeatRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
switch (dataFrame.getCmd()){
// If it is a heartbeat response, release it because subsequent business handlers care
case DataFrame.CMD_HEART_BEAT:
ReferenceCountUtil.release(msg);
break;
// If the response is successfully authenticated, the heartbeat is periodically sent
case DataFrame.CMD_AUTHORIZATION:
// Use the netty built-in task processor to send a heartbeat every 10 seconds
ctx.executor().scheduleAtFixedRate(() -> {
ctx.writeAndFlush(DataFrame.getHeartBeatDataFrame());
}, 0.10, TimeUnit.SECONDS);
ctx.fireChannelRead(msg);
break;
default:
// Pass the message backwards and let the business handler handle it
ctx.fireChannelRead(msg);
break; }}}Copy the code
Client business Handler code
package com.king.netty.core.client;
import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/ * * *@author King
* @date2021/7/14 * /
public class ClientBusinessHandler extends ChannelInboundHandlerAdapter {
public static final Logger logger = LoggerFactory.getLogger(ClientBusinessHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
// Send a service request
ctx.writeAndFlush(new DataFrame(DataFrame.CMD_GET_INFO, "which language is the best ?".getBytes()));
}else {
// Prints messages sent by the server
logger.debug("receive message: "+ dataFrame); } ReferenceCountUtil.release(msg); }}Copy the code
Server startup code
package com.king.netty.core.server;
import com.king.netty.core.DataFrameDecoder;
import com.king.netty.core.DataFrameEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ipfilter.IpFilterRule;
import io.netty.handler.ipfilter.IpFilterRuleType;
import io.netty.handler.ipfilter.RuleBasedIpFilter;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/ * * *@author King
* @date2021/7/14 * /
@Component
public class NettyServer implements InitializingBean.DisposableBean {
private boolean started;
private Channel channel;
private NioEventLoopGroup parentGroup;
private NioEventLoopGroup childGroup;
@Override
public void destroy(a) throws Exception {
// Spring calls stop to release the server when the object is destroyed
if(started){ stopServer(); }}@Override
public void afterPropertiesSet(a) throws Exception {
// After spring initializes the object, it invokes the start method to start the service
if (started){
return;
}
startServer();
}
void startServer(a) throws InterruptedException {
this.parentGroup = new NioEventLoopGroup();
this.childGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
// Set the thread group
.group(parentGroup, childGroup)
// Set to NIO mode
.channel(NioServerSocketChannel.class)
// Set the TCP sync queue size to prevent flood attacks
.childOption(ChannelOption.SO_BACKLOG, 1024)
// Set all channelHandlers in the pipeline
// The inbound channelHandler needs to be ordered
// The outbound channelHandler needs to be in order
.childHandler(new ServerHandlerInit());
this.channel = serverBootstrap.bind(8888).sync().channel();
started = true;
}
void stopServer(a){
try{
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
channel.closeFuture().syncUninterruptibly();
}finally {
this.parentGroup = null;
this.childGroup = null;
this.channel = null;
started = false; }}static class ServerHandlerInit extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Logs are printed
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
/ / IP filter
pipeline.addLast(new RuleBasedIpFilter(new IpFilterRule() {
@Override
public boolean matches(InetSocketAddress remoteAddress) {
// Custom IP address interceptor. IP addresses that do not start with 127 are not allowed to connect
return ! remoteAddress.getHostName().startsWith("127");
}
@Override
public IpFilterRuleType ruleType(a) {
returnIpFilterRuleType.REJECT; }}));/ / LengthFieldBasedFrameDecoder used to solve the problem of TCP stick pack half a pack
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535.// maxFrameLength Maximum length of a message
2.// lengthFieldOffset specifies the offset of the length field, indicating that the length field is skipped after a specified number of bytes
2.// lengthFieldLength Specifies the field that records the length of the frame data, i.e. the length of the length field itself
0.// lengthAdjustment a modified value of length, which can be positive or negative. After reading the length value N of the packet, Netty considers that the following N bytes need to be read, but depending on the actual situation, it may need to increase or decrease the N value. How much you increase, how much you decrease, you write it in this parameter
2 / / initialBytesToStrip from skip the number of bytes in the data frame, after getting a complete packet said, throw away how many bytes in the packet, is the subsequent actual need of business data.
));
// Set the heartbeat timeout period to 30 seconds. If no heartbeat is received within 30 seconds, a ReadTimeoutException will be thrown
pipeline.addLast(new ReadTimeoutHandler(30));
// Custom protocol decoder
pipeline.addLast(new DataFrameDecoder());
// Custom protocol encoder
pipeline.addLast(new DataFrameEncoder());
// Authentication processing
pipeline.addLast(new AuthorizationResponseHandler());
// Heartbeat processing
pipeline.addLast(new HeartBeatResponseHandler());
// Business processing handler
pipeline.addLast(newServerBusinessHandler()); }}}Copy the code
Server authentication processing Handler
package com.king.netty.core.server;
import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/ * * *@author King
* @date2021/7/14 * /
public class AuthorizationResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
if (dataFrame.getCmd() == DataFrame.CMD_AUTHORIZATION) {
String auth = "{\"username\":\"test\", \"password\":\"abcdef\"}";
byte[] params = dataFrame.getParams();
if (auth.equals(new String(params))){
// The authentication succeeded
ctx.writeAndFlush(new DataFrame(dataFrame.getCmd(), "success".getBytes()));
}else {
// Authentication failed
ctx.writeAndFlush(new DataFrame(dataFrame.getCmd(), "fail".getBytes()));
}
// Release the message
ReferenceCountUtil.release(msg);
}else {
// Non-authenticated requests are forwarded to subsequent servicesctx.fireChannelRead(msg); }}}Copy the code
Server heartbeat processing Handler
package com.king.netty.core.server;
import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
/ * * *@author King
* @date2021/7/14 * /
public class HeartBeatResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
// If it is a heartbeat request, release it because subsequent business handlers care
if (dataFrame.getCmd() == DataFrame.CMD_HEART_BEAT) {
ctx.writeAndFlush(DataFrame.getHeartBeatDataFrame());
ReferenceCountUtil.release(msg);
} else {// Pass the message backwards and let the business handler handle itctx.fireChannelRead(msg); }}@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException){
// Disconnect the client
ctx.close();
return;
}
super.exceptionCaught(ctx, cause); }}Copy the code
Server service processing Handler
package com.king.netty.core.server;
import com.king.netty.core.DataFrame;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/ * * *@author King
* @date2021/7/14 * /
public class ServerBusinessHandler extends ChannelInboundHandlerAdapter {
public static final Logger logger = LoggerFactory.getLogger(ServerBusinessHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DataFrame dataFrame = (DataFrame) msg;
logger.debug("receive message: " + dataFrame);
// Return client data
DataFrame response = doBusiness(dataFrame);
ctx.writeAndFlush(response);
}
private DataFrame doBusiness(DataFrame dataFrame){
// Take care of your business
// todo
// Respond to the client
return new DataFrame(dataFrame.getCmd(), "java is the best language".getBytes()); }}Copy the code
Protocol definition code
package com.king.netty.core;
import lombok.Data;
/ * * *@author King
* @date2021/7/14 * /
@Data
public class DataFrame {
public static final byte CMD_HEART_BEAT = 0;
public static final byte CMD_AUTHORIZATION = 1;
public static final byte CMD_GET_INFO = 2;
/** * Frame header length Life parameter checksum * 0x55 0xAA 2byte 1byte 0~1476bytes 2bytes ** Length = command word + parameter + checksum, excluding frame header and length bytes; * Checksum = Frame header + length + command word + sum of bytes of arguments. * * /
public static final byte[] HEADER = new byte[] {0b01010101, (byte) 0b10101010};
private byte cmd;
private byte[] params;
private int crc;
public DataFrame(byte cmd, byte[] params, int crc) {
this.cmd = cmd;
this.params = params;
this.crc = crc;
}
public DataFrame(byte cmd, byte[] params) {
this.cmd = cmd;
this.params = params;
this.crc = getCrc();
}
public boolean checkCrc(a){
return getCrc() == this.crc;
}
public int getLength(a) {
// Length = command word + argument + checksum, excluding frame header and length bytes;
return 1 + params.length + 2;
}
public int getCrc(a){
// Checksum = frame header + length + command word + sum of bytes of arguments.
int crc = 0;
/ / frame head
crc += 0b01010101;
crc += 0b10101010;
/ / the length
crc += getLength();
/ / parameters and
for (byte b: params){
crc += (b & 0xFF);
}
return crc;
}
public static DataFrame getHeartBeatDataFrame(a){
return new DataFrame(DataFrame.CMD_HEART_BEAT, new byte[] {}); }public static DataFrame getAuthorizationDataFrame(a){
String msg = "{\"username\":\"test\", \"password\":\"abcdef\"}";
return new DataFrame(DataFrame.CMD_AUTHORIZATION, msg.getBytes());
}
@Override
public String toString(a) {
return "DataFrame{" +
"cmd=" + cmd +
", params=" + new String(params) +
'} '; }}Copy the code
Protocol decoder
package com.king.netty.core;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/ * * *@author King
* @date2021/7/14 * /
public class DataFrameDecoder extends ByteToMessageDecoder {
/** * Frame header length Life parameter checksum * 0x55 0xAA 2byte 1byte 0~1476bytes 2bytes ** Length = command word + parameter + checksum, excluding frame header and length bytes; * Checksum = Frame header + length + command word + sum of bytes of arguments. * * /
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
/ / length
int length = in.readShort();
/ / command
byte cmd = in.readByte();
/ / parameters
byte[] params = new byte[length-3];
in.readBytes(params);
/ / the checksum
int crc = in.readShort();
DataFrame dataFrame = new DataFrame(cmd, params, crc);
// Compute the checksum
if (dataFrame.checkCrc()){
// Add parsed data to the list and pass it to the subsequent channelHandlerout.add(dataFrame); }; }}Copy the code
Protocol encoder
package com.king.netty.core;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/ * * *@author King
* @date2021/7/14 * /
public class DataFrameEncoder extends MessageToByteEncoder<DataFrame> {
@Override
protected void encode(ChannelHandlerContext ctx, DataFrame msg, ByteBuf out) throws Exception {
// Write the frame header
out.writeBytes(DataFrame.HEADER);
// Write the length
out.writeShort(msg.getLength());
// Write the command
out.writeByte(msg.getCmd());
/ / number of refs
out.writeBytes(msg.getParams());
/ / the checksumout.writeShort(msg.getCrc()); }}Copy the code