preface
This article mainly introduces SpringBoot integration of Netty and the use of Protobuf data transfer related content. Protobuf has a brief introduction to usage, but Netty has been briefly introduced in previous articles, so we won’t go into details here.
Protobuf
introduce
The ProtocolBuffer (PB) is a Google data exchange format that is language – and platform-independent. Google provides implementations in several languages: Java, c#, c++, go, and python, each of which contains a compiler and library files for the corresponding language. Because it is a binary format, it is much faster than using XML for data exchange. It can be used for data communication between distributed applications or data exchange in heterogeneous environments. As a binary data transmission format with excellent efficiency and compatibility, it can be used in many fields such as network transmission, configuration files, data storage and so on.
The official address: https://github.com/google/protobuf
use
The use here is only java-related. First we need to create a proto file where we define the files we need to transfer. For example, we need to define a user’s information, including the main fields of number, name, age. The format of the protobuf file is as follows: Note: proto3 is used here, I have written the relevant comments, so I won’t go into more details here. Note that the proto file and the generated Java file name are not the same!
syntax = "proto3"; // The generated package name option javA_package ="com.pancm.protobuf"; // Generated Java name option JAVA_outer_className ="UserInfo"; message UserMsg { // ID int32 id = 1; // Name string name = 2; // age = 1; Int32 state = 4; }Copy the code
Once the file is created, we put the file and protoc. Exe (the software that generates Java files) in the protobuf folder on drive E, then go to the DOS interface of that directory and type :protoc. Exe –java_out= Absolute path name of the file. Such as:
protoc.exe --java_out=E:\protobuf User.proto
Copy the code
After typing, press Enter to see the generated Java file in the same directory, and then put the file in the path specified by the file in the project.
Note: the file software for generating protobuf and the protobuf file for testing have also been integrated into the project and can be obtained directly.
Once the Java files are generated, we’ll see how to use them. Here I will paste the code directly, and write the comments in the code, should be easier to understand… Code examples:
/ / in accordance with the definition of data structure, to create an object. The UserInfo UserMsg. Builder the UserInfo = the UserInfo. UserMsg. NewBuilder (); userInfo.setId(1); userInfo.setName("xuwujing"); userInfo.setAge(18); UserInfo.UserMsg userMsg = userInfo.build(); ByteArrayOutputStream output = new ByteArrayOutputStream(); userMsg.writeTo(output); Byte [] byteArray = output.tobytearray (); ByteArrayInputStream Input = new ByteArrayInputStream(byteArray); / / deserialize the UserInfo. UserMsg userInfo2. = the UserInfo UserMsg. ParseFrom (input); System.out.println("id:" + userInfo2.getId());
System.out.println("name:" + userInfo2.getName());
System.out.println("age:" + userInfo2.getAge());
Copy the code
Note: Because protobuf is transmitted through binary, you need to pay attention to the encoding. Also with protobuf you need to pay attention to the maximum byte length that can be transmitted at a time.
Output result:
id:1
name:xuwujing
age:18
Copy the code
Netty SpringBoot integration
Note: If you want to get the project directly, you can jump to the bottom and download the project code through the link.
The development of preparation
Environment requirements JDK: : 1.8 Netty: : 4.0 or later (excluding 5) Protobuf: 3.0 or later
If you’re not familiar with Netty, check out some of my previous articles. God please ignore ~. To address: https://blog.csdn.net/column/details/17640.html
First, there are Maven dependencies:
< properties > < project. Build. SourceEncoding > utf-8 < / project. Build. SourceEncoding > < Java version > 1.8 < / Java version > < netty version > 4.1.22. Final < / netty version > < protobuf. Version > 3.5.1 track of < / protobuf version > < springboot > 1.5.9. RELEASE < / springboot > < fastjson > 1.2.41 < / fastjson > < maven.com piler. Source > 1.8 < / maven.com piler source > < maven.com piler target > 1.8 < / maven.com piler. Target > < / properties > < dependencies > < the dependency > <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${springboot}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<version>${springboot}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson}</version> </dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope>
</dependency>
</dependencies>
Copy the code
After adding maven dependencies, there is nothing to add to the configuration file because it is just a listening port for now.
The code
The code module is mainly divided into server and client. The main implementation of the business logic: after the server is successfully started, the client is also successfully started. At this time, the server will send a protobuf message to the client, and then the client responds accordingly. After the connection between the client and the server is successful, the client sends heartbeat instructions to the server every period of time to inform the server that the client is still in existence. If the client does not send messages within the specified time, the server closes the connection with the client. When the client fails to connect to the server, it tries to reconnect at regular intervals until the connection succeeds!
The service side
The first step is to write the server startup class, which is annotated in more detail in the code and won’t be covered here. Note, however, that in the Netty article I wrote earlier, the server is started directly by using the main method, so it is directly new an object. After integrating with SpringBoot, we need to hand Over Netty to SpringBoot to manage, so we use annotations here. The code is as follows:
@Service("nettyServer") public class NettyServer { private static final int port = 9876; Private static EventLoopGroup boss = new NioEventLoopGroup(); Private static EventLoopGroup work = new NioEventLoopGroup(); Private static ServerBootstrap b = new ServerBootstrap(); @Autowired private NettyServerFilter nettyServerFilter; public voidrun() { try { b.group(boss, work); b.channel(NioServerSocketChannel.class); b.childHandler(nettyServerFilter); ChannelFuture f = b.bind(port).sync(); System.out.println("Server started successfully, port is :"+ port); F.channel ().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } Override {override the EventLoopGroup where all resources are released, including the created thread work.shutdownGracefully(); boss.shutdownGracefully(); }}}Copy the code
After the server main class is written, we will set the corresponding filtering conditions. You need to inherit the ChannelInitializer class from Netty and override the initChannel method to add corresponding Settings, such as heartbeat timeout Settings, transport protocol Settings, and corresponding service implementation classes. The code is as follows:
@Component public class NettyServerFilter extends ChannelInitializer<SocketChannel> { @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline ph = ch.pipeline(); // Add parameters: read timeout, write timeout, all types of timeout, and time format ph.addLast(new IdleStateHandler(5, 0, 0, timeUnit.seconds)); / / decoding and encoding, should work in harmony with the client / / transport protocol Protobuf ph. AddLast (new ProtobufVarint32FrameDecoder ()); ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance())); ph.addLast(new ProtobufVarint32LengthFieldPrepender()); ph.addLast(new ProtobufEncoder()); // Business logic implementation class ph.addlast ("nettyServerHandler", nettyServerHandler); }}Copy the code
After writing the code for the service-related setup, we will write the main business code. Use Netty writing business layer code, we need to inherit ChannelInboundHandlerAdapter or SimpleChannelInboundHandler class, here by the way the difference between them two. Inheritance SimpleChannelInboundHandler after class, after receiving the data will be automatically release data takes up the Bytebuffer resources. And inheriting the class requires specifying the data format. While inheriting ChannelInboundHandlerAdapter will not automatically release, need to manually call ReferenceCountUtil. Release to release () method. Inheriting the class does not require specifying a data format. So in this case, personal recommendations server-side inherit ChannelInboundHandlerAdapter, manually release, prevent data processed is automatically released. In addition, the server may have multiple clients connected, and the data format requested by each client is not consistent, so it can be processed accordingly. The client can inherit SimpleChannelInboundHandler classes depending on the situation. The advantage is that the format of the data to be transmitted is directly specified, so there is no need for format conversion.
The code is as follows:
@Service("nettyServerHandler") public class NettyServerHandler extends ChannelInboundHandlerAdapter {/ * * free * times/private int idle_count = 1; /** private int count = 1; /** * When the connection is established, */ @override Public void channelActive(ChannelHandlerContext CTX) throws Exception {system.out.println ("Connected client address :" + ctx.channel().remoteAddress());
UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0) .build(); ctx.writeAndFlush(userMsg); super.channelActive(ctx); } /** * Timeout processing Is triggered if the heartbeat of the client is not received within 5 seconds; If more than two times, it is directly closed; */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if(idlestate.reader_idle. Equals (event.state())) {// If the read channel is idle, the heartbeat command system.out.println () is not received."I have not received any message from the client for 5 seconds.");
if (idle_count > 1) {
System.out.println("Close this inactive channel."); ctx.channel().close(); } idle_count++; }}else{ super.userEventTriggered(ctx, obj); Override public void channelRead(ChannelHandlerContext CTX, Object msg) throws Exception { System.out.println("The first" + count + "Time" + ", the message received by the server :"+ msg); Try {// If the data is protobufif (msg instanceof UserMsg) {
UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
if (userState.getState() == 1) {
System.out.println("Client business processing successful!");
} else if(userState.getState() == 2){
System.out.println("Receive heartbeat sent to client!");
}else{
System.out.println("Unknown order!"); }}else {
System.out.println("Unknown data!" + msg);
return; } } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } count++; } @override public void exceptionCaught(ChannelHandlerContext CTX, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
There is also a server side startup class, which used to boot directly from the main method, but has been changed to boot via springBoot, with little difference. The code is as follows:
@springBootApplication public class NettyServerApp {public static void main(String[] args) {// Start embedded Tomcat and initialize Context = SpringApplication.run(nettyServerApp. class, args); NettyServer nettyServer = context.getBean(NettyServer.class); nettyServer.run(); }}Copy the code
At this point, the corresponding code on the server side is written.
The client
The code on the client side is similar in many ways to that on the server side, so I won’t go into too much detail, but I’ll briefly describe some of the different code. The first is the client’s main class, which is basically the same as the server, with more listening ports and a listener (to listen for disconnection from the server, for reconnection). The main code logic is as follows:
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
ChannelFuture f = null;
try {
if(bootstrap ! = null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.handler(nettyClientFilter);
bootstrap.remoteAddress(host, port);
f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if(! futureListener.isSuccess()) { System.out.println("Disconnect from the server! Prepare to attempt reconnection after 10 seconds!");
eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); }});if(initFalg){
System.out.println("Netty client started successfully!");
initFalg=false; } // block f.channel().closeFuture().sync(); } } catch (Exception e) { System.out.println("Client connection failed!"+e.getMessage()); }}Copy the code
Note: the implementation of the listener piece is written in JDK1.8.
The client filters its base and server all the time. However, it is important to note that the transport protocol, encoding and decoding should be the same, and the heartbeat read and write time should be less than the server set time. The changed code is as follows:
ChannelPipeline ph = ch.pipeline(); Read timeout, write timeout, all types of timeout, time format ph.addLast(new IdleStateHandler(0, 4, 0, timeUnit.seconds));Copy the code
Client business code logic. The main logic is that the heartbeat sends on time and parses the protobuf data sent by the service. The Sharable annotation is used to ensure that multiple handlers can be safely shared by multiple channels, i.e. to ensure thread safety. Without further ado, the code is as follows:
@Service("nettyClientHandler") @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; Private int fcount = 1; private int fcount = 1; @override public void channelActive(ChannelHandlerContext CTX) throws Exception {system.out.println ("When establishing connection:"+ new Date()); ctx.fireChannelActive(); } /** * Override public void channelInactive(ChannelHandlerContext CTX) throws Exception {system.out.println ("When closing connection:"+ new Date()); final EventLoop eventLoop = ctx.channel().eventLoop(); nettyClient.doConnect(new Bootstrap(), eventLoop); super.channelInactive(ctx); } /** * Heartbeat request processing sends heartbeat requests every 4 seconds; * */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { System.out.println("Loop request time:" + new Date() + ", times" + fcount);
if (obj instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) obj;
if(idlestate.writer_idle.equals (event.state())) {// If the write channel is idle, send the heartbeat command UserMsg.Builder userState = UserMsg.newBuilder().setState(2); ctx.channel().writeAndFlush(userState); fcount++; Override public void channelRead(ChannelHandlerContext CTX, Object MSG) throws Exception {// If data is not of the protobuf typeif(! (msg instanceof UserMsg)) { System.out.println("Unknown data!" + msg);
return; } try {// Get protobuf data userinfo.usermsg UserMsg = (userinfo.usermsg) MSG; // Do the corresponding business processing... System.out.println(system.out.println ("User information received by the client. Number:" + userMsg.getId() + "Name:" + userMsg.getName() + ",年龄:"+ userMsg.getAge()); Usermsg.builder userState = usermsg.newBuilder ().setState(1); ctx.writeAndFlush(userState); System.out.println("Successfully sent to server!"); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); }}}Copy the code
So we’re done with the client code up here.
A functional test
Start the server first, and then the client. Let’s see if that’s true.
Server output:
The server is successfully started and the port number is 9876. IP address of the connected client :/127.0.0.1:53319 For the first time, the server receives the message state: 1. The client service is successfully processed. The second time, the server received the message :state: 2 received the heartbeat sent by the client! The third time, the server received the message :state: 2 Received the heartbeat sent by the client! The fourth time, the server received the message :state: 2 Received the heartbeat sent by the client!Copy the code
Client input result:
Netty client started successfully! Setting up a connection: Mon Jul 16 23:31:58 CST 2018 User information received by the client. Number :1, name: Xuwujing, age :18 Successfully sent to the server! Mon Jul 16 23:32:02 CST 2018, count 1 Mon Jul 16 23:32:10 CST 2018, count 3 Time of loop request: Mon Jul 16 23:32:14 CST 2018, count 4Copy the code
The printed information can be seen as described above.
Now let’s see if the client can reconnect. Start the client and then the server.
Client input result:
Netty client started successfully! Disconnect from the server! Prepare to attempt reconnection after 10 seconds! Client connection failed! AbstractChannel$CloseFuture@1fbaa3AC (Incomplete) When establishing a connection: Mon Jul 16 23:41:33 CST 2018 User information received by the client. Number :1, name: Xuwujing, age :18 Successfully sent to the server! Mon Jul 16 23:41:42 CST 2018, count 1 Mon Jul 16 23:41:46 CST 2018, count 3Copy the code
Server output:
The server is successfully started and the port number is 9876. IP address of the connected client :/127.0.0.1:53492 For the first time, the server receives the message state: 1. The client service is successfully processed. The second time, the server received the message :state: 2 received the heartbeat sent by the client! The third time, the server received the message :state: 2 Received the heartbeat sent by the client! The fourth time, the server receives the message :state: 2Copy the code
And so it turned out!
other
SpringBoot integration Netty using Protobuf for data transfer ends here. Protobuf data transfer using SpringBoot integration Netty https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
By the way, don’t use springBoot integration Netty project address: https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
Original is not easy, if you feel good, I hope to give a recommendation! Your support is the biggest motivation for my writing! Copyright Notice: Author: nothing Blog garden reference: http://www.cnblogs.com/xuwujing CSDN reference: http://blog.csdn.net/qazwsxpcm personal blog reference: http://www.panchengming.com