preface
This article mainly introduces how to integrate Netty with Springboot. Because I am still in the process of learning Netty, I have no experience of applying Netty to actual production projects. Here is also a summary after searching some Netty examples on the Internet and learning from others’ writing methods and experience. Please forgive me for any repetition.
I will analyze and discuss how SpringBoot integrates with Netty in the following steps:
- Build the Netty server
- Build the Netty client
- Define the message format with Protobuf
- Server idle detection
- The client sends a heartbeat packet to reconnect the disconnection
PS: For the sake of simplicity (mainly laziness), I put the Netty server and client in the same SpringBoot project, but I can also separate the client and server.
Build the Netty server
The Netty server code is actually quite simple. The code is as follows:
@Component
@Slf4j
public class NettyServer {
/** * The boss thread group is used to handle connection work */
private EventLoopGroup boss = new NioEventLoopGroup();
/** * work thread group used for data processing */
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/** start Netty Server **@throws InterruptedException
*/
@PostConstruct
public void start(a) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
/ / specified Channel
.channel(NioServerSocketChannel.class)
// Set the socket address using the specified port
.localAddress(new InetSocketAddress(port))
// The number of queues that can be connected to the server corresponds to the backlog parameter in the LISTEN function of TCP/IP
.option(ChannelOption.SO_BACKLOG, 1024)
// Set the TCP long connection. Generally, if no data is communicated within two hours,TCP automatically sends an active probe data packet
.childOption(ChannelOption.SO_KEEPALIVE, true)
// The transmission of small packets into larger frames increases the load on the network, known as TCP delay transmission
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("Start Netty Server"); }}@PreDestroy
public void destory(a) throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("Netty off"); }}Copy the code
Since we use Netty in our SpringBoot project, we wrap the Netty server startup in a start() method with the @postConstruct annotation, Add the @PostConstruct annotation to the specified method to indicate that the method is called after Spring initializes the NettyServer class.
The logical processing chain for ChannelHandler will be explained later, taking into account operations such as using the heartbeat mechanism.
Build the Netty client
The Netty client code is similar to the server code.
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start(a) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
// Client disconnection logic
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("Netty server connected successfully");
} else {
log.info("Connection failed. Re-connect.");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS); }}); socketChannel = (SocketChannel) future.channel(); }}Copy the code
It also contains the logic for client disconnection. More details will be explained below.
Use Protobuf to build the communication protocol
In the process of integrating with Netty, we used Google’s Protobuf to define the message format. Here’s a brief introduction to Protobuf
Protobuf profile
Google officially defines Protobuf as follows:
Protocol Buffers is a lightweight and efficient structured data storage format that can be used for structured data serialization and is suitable for data storage or RPC data exchange formats. It can be used for language independent, platform independent, extensible serialized structure data format in communication protocol, data storage and other fields.
Protobuf is often used in Netty for serialization schemes, but you can also use Protobuf to build communication protocols between clients and servers
Why use protobuf
We’re using Protobuf as our serialization tool here, so why should we use Protobuf instead of other serialization schemes, such as JDK serialization, Thrift, FastJSON, etc.
First of all, the JDK’s built-in serialization method has many disadvantages, such as:
- The serialized stream is too large
- The performance is too low
- Inability to cross languages
Google Protobuf supports C++, Java, and python across languages. Protobuf encoded messages are smaller, easier to store and transfer, and its performance is also very high compared to other serialization frameworks, it is also very advantageous, the specific Java serialization framework comparison will not be discussed here. In summary, Google Protobuf is currently widely used in a variety of projects, and it has many advantages that make it an option.
How to use protobuf
For Java, using Protobuf consists of the following steps:
- in
.proto
The message format is defined in the - Compile using the Protobuf compiler
.proto
File as a Java class - Use Java’s corresponding Protobuf API to write or read messages
Defines the protobuf protocol format
For example, the message.proto file in my Demo is as follows:
// There are two types of protobuf syntax: proto2 and proto3
syntax = "proto3";
// File options
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// Message model definition
message Message {
string requestId = 1;
CommandType cmd = 2;
string content = 3;
enum CommandType {
NORMAL = 0; // Regular business messages
HEARTBEAT_REQUEST = 1; // Client heartbeat message
HEARTBEAT_RESPONSE = 2; // Server heartbeat message}}Copy the code
Document interpretation:
- The first line of the article specifies that it is in use
proto3
Syntax, which the compiler uses by default if not specifiedproto2
Syntax. Now it may be commonly used in new projectsproto3
Grammar,proto3
thanproto2
More languages are supported but more concise. If you use Protobuf for the first time, you can choose to use itproto3
- define
.proto
File, you can label a series of options, some of which are file-level, such as the second and third lines above,java_package
The file option indicates protocol compiler compilation.proto
The package where the Java classes generated by the file are located,java_outer_classname
The option indicates the name of the Java class you want to generate Message
Here I define three fields, each with a unique numeric identifier that identifies each field in the binary format of the messageMessage
Also adds an enumeration type that contains the typeCommandType
Each enumerated type must map its first type to 0, which is the default value.
Message model definition
RequestId represents the message Id,CommandType represents the message type, here is simply divided into heartbeat message type and business message type, and content is the specific message content. Here the message format definition is very simple, the real project actual combat, on the custom message format requirements are very many, is more complex.
Simple introduced the protobuf some grammatical rules, introduce more official documents about protobuf grammar: developers.google.com/protocol-bu…
use.proto
Compiler compilation
The first step is to define the protobuf message format, and then we use the.proto compiler to compile the message format we define into the corresponding Java class so that we can use the message class in the project.
About the protobuf compiler installed here I will not dwell on, see the official document: developers.google.com/protocol-bu…
Once the compiler is installed, compile the.proto file with the following command:
protoc -I = ./ --java_out=./ ./Message.proto
Copy the code
-I
The option is used to specify which to compile.proto
The directory where the message definition file resides. This option can also be written as--proto_path
--java_out
The option indicates where to store the generated Java code. For example, for C++ code, the option may be different--cpp_out
- After the first two options, add the message definition file to compile
Use Java’s corresponding Protobuf API to read and write messages
Proto: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase: MessageBase
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1 track of</version>
</dependency>
Copy the code
In every Java class generated using Protobuf, there are two kinds of inner classes: Msg and the Builder that Msg contains (Msg in this case is the actual message transfer class). Each message defined in proto generates a Msg, and each Msg corresponds to a Builder:
- Buidler provides apis for building classes and querying classes
- Msg provides apis for query, serialization, and deserialization
For example, we use Builder to build Msg, as shown in the following example:
public class MessageBaseTest {
public static void main(String[] args) {
MessageBase.Message message = MessageBase.Message.newBuilder()
.setRequestId(UUID.randomUUID().toString())
.setContent("hello world").build();
System.out.println("message: "+message.toString()); }}Copy the code
Here introduces protobuf – not relevant usage of Java API, or refer to the official documentation for more details: developers.google.com/protocol-bu…
Protobuf codec
With all that said, the protobuf format is defined, but we still need to encode and decipher the message during client and server transport. Of course, we can customize the message codec. Protobuf-java API provides serialization and deserialization methods. The good news is that In order to support Protobuf, Netty provides a codec for the Protobuf, as shown in the following table (from Netty Actual Combat) :
The name of the | describe |
---|---|
ProtobufDecoder | The message is decoded using Protobuf |
ProtobufEncoder | The message is encoded using Protobuf |
ProtobufVarint32FrameDecoder | Dynamically split the received ByteBuf based on the value of the “Base 128 Varint” integer length field of Google Protocol Buffers in the message |
ProtobufVarint32LengthFieldPrepender | Append a Google Protocol Buffers “Base 128 Varint” integer length field to ByteBuf |
With these codecs, add them to the client and server side ChannelPipeline to codec messages as follows:
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
// Idle detection
.addLast(new ServerIdleStateHandler())
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(newNettyServerHandler()); }}Copy the code
Heartbeat mechanism of the client
Introduction to Heartbeat Mechanism
Heartbeat is a special data packet periodically sent between a client and a server in a TCP long connection to inform each other that they are online to ensure the validity of the TCP connection.
How to implement the heartbeat mechanism
There are two ways to implement the heartbeat mechanism:
- The KEEPalive mechanism at the TCP layer is used
- User-defined heartbeat mechanism at the application layer
The keepalive mechanism at the TCP level is also defined in the previous Netty server and client startup process. We need to manually enable the keepalive mechanism, as shown in the following example:
ChildOption (channeloption.so_keepalive,true)
Copy the code
In addition to enabling keepalive over TCP, when I studied some open source Demos on Github, I found that people often customize their own heartbeat mechanism and define their own heartbeat packets. Netty also provides IdleStateHandler to implement the heartbeat mechanism
Netty Implements the heartbeat mechanism
Here’s how the client implements the heartbeat mechanism:
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("No message has been sent to the server for 10 seconds.");
// Send the heartbeat packet to the server
// The message format defined by Protobuf is used here
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
// Send a heartbeat message and close the connection if it fails to sendctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}else {
super.userEventTriggered(ctx, evt); }}}Copy the code
We created a ChannelHandler class and overwrote the userEventTriggered method, which implemented the logic of sending heartbeat packets, and added the IdleStateEvent class to the logic chain.
In fact, when the connection is idle for too long, an IdleStateEvent event will be triggered, and then we call userEventTriggered to handle the IdleStateEvent event.
After starting the client and server, the console prints the following heartbeat message:
The 2018-10-28 16:30:46. 42648-825 the INFO [ntLoopGroup - 2-1] c.p jmike. Server. Client. HeartbeatHandler: Have not send a message to a server 2018-10-28 10 s 16:30:47. 42648-176 the INFO] [ntLoopGroup - 4-1 c.p.server.server.Net tyServerHandler: The client receives the following heartbeat message: requestId:"80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"
Copy the code
We only discussed the client sending heartbeat messages to the server, so does the server need to send heartbeat messages to the client?
In general, for a persistent connection, one option is for both sides to send heartbeat messages, and the other is for the server to act as a passive receiver. If the server does not receive a heartbeat packet for a certain period of time, the connection is directly disconnected.
In this case, the client sends a heartbeat message, the server passively receives it, and then sets up a period of time. If the server does not receive any message during this period, it will disconnect the connection actively. This is also called idle detection
The Netty client is disconnected again. Procedure
The Netty client needs to connect to the server in the following two cases:
- When the Netty client is started, the server is disconnected
- In the middle of the program running, the server suddenly crashed
In the first case, the implementation of ChannelFutureListener is used to monitor whether the connection is successful. If the connection is not successful, the disconnection retry mechanism is carried out. The code is as follows:
@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;
public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}
@PostConstruct
public void start(a) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ClientHandlerInitilizer());
ChannelFuture future = bootstrap.connect();
// Client disconnection logic
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("Netty server connected successfully");
} else {
log.info("Connection failed. Re-connect.");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS); }}); socketChannel = (SocketChannel) future.channel(); }}Copy the code
ChannelFuture adds a listener and calls the channel().eventloop ().schedule() method to perform the retry logic if the client fails to connect to the server.
In the second case, the server suddenly crashes during the run. In this case, we implement the Handler that handles the data read and write.
@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("No message has been sent to the server for 10 seconds.");
// Send the heartbeat packet to the server
MessageBase.Message heartbeat = new MessageBase.Message().toBuilder().setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID().toString())
.setContent("heartbeat").build();
// Send a heartbeat message and close the connection if it fails to sendctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }}else {
super.userEventTriggered(ctx, evt); }}@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// If the server is suspended during the run, perform the reconnection mechanism
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> nettyClient.start(), 10L, TimeUnit.SECONDS);
super.channelInactive(ctx); }}Copy the code
Here we override the channelInactive method directly in the Handler that implements the heartbeat mechanism, and then execute the retry logic in that method. The NettyClient class is injected here to make it easy to call the Start () method of NettyClient to reconnect to the server
The channelInactive() method will be called if the current Channel is not connected to a remote node.
Server idle detection
What is idle detection? In fact, idle detection is to detect whether data is read or written during a period of time. For example, the server detects whether it has received data from the client within a period of time. If it has not, it releases resources and closes the connection in time.
For idle detection, Netty specifically provides IdleStateHandler to implement this functionality. The following code reference from the “Netty entry and actual combat: copy wechat IM instant messaging system” in the idle detection part of the implementation:
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
/** * Set the idle detection time to 30s */
private static final int READER_IDLE_TIME = 30;
public ServerIdleStateHandler(a) {
super(READER_IDLE_TIME, 0.0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("No data read in {} seconds, close connection", READER_IDLE_TIME);
ctx.channel().close();
Copy the code
Controller method test
Since this is a Demo of SpringBoot integration with Netty, we create a Controller method to test the communication between Netty server and client. The Controller code is as follows:
@RestController
public class ConsumerController {
@Autowired
private NettyClient nettyClient;
@GetMapping("/send")
public String send(a) {
MessageBase.Message message = new MessageBase.Message()
.toBuilder().setCmd(MessageBase.Message.CommandType.NORMAL)
.setContent("hello server")
.setRequestId(UUID.randomUUID().toString()).build();
nettyClient.sendMsg(message);
return "send ok"; }}Copy the code
Inject NettyClient and call its sendMsg method to send the message. The result is:
C.p.server.server.Net tyServerHandler: received the client's business news: requestId:"aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"
Copy the code
summary
This section describes in detail how to integrate Netty with SpringBoot, using many examples and articles from older players to get a first look at how to use Netty. If there are any mistakes in the above, please point them out. Github: github.com/pjmike/spri…
References & thanks
- Netty entry and actual combat: imitation wechat IM instant messaging system
- Netty Client reconnection implementation
- Netty(1) SpringBoot integrates the long-connection heartbeat mechanism
- A brief analysis of Netty to realize heartbeat mechanism and disconnection
- Protobuf3 Grammar guide