preface

This article mainly introduces SpringBoot integration of Netty and the use of Protobuf data transfer related content. – Protobuf gives a brief introduction to usage.

Protobuf

introduce

The ProtocolBuffer (PB) is a Google data exchange format that is language – and platform-independent. Google provides implementations in several languages: Java, c#, c++, go, and python, each of which contains a compiler and library files for the corresponding language. Because it is a binary format, it is much faster than using XML for data exchange. It can be used for data communication between distributed applications or data exchange in heterogeneous environments. As a binary data transmission format with excellent efficiency and compatibility, it can be used in many fields such as network transmission, configuration files, data storage and so on.

The official address: https://github.com/google/protobuf

use

The use here is only java-related. First we need to create a proto file where we define the files we need to transfer. For example, we need to define a user’s information, including the main fields of number, name, age. The format of the protobuf file is as follows: Note: proto3 is used here, I have written the relevant comments, so I won’t go into more details here. Note that the proto file and the generated Java file name are not the same!

Once the file is created, we put the file and protoc. Exe (the software that generates Java files) in the protobuf folder on drive E, then go to the DOS interface of that directory and type :protoc. Exe –java_out= Absolute path name of the file. Such as:

protoc.exe --java_out=E:\protobuf User.protCopy the code

After typing, press Enter to see the generated Java file in the same directory, and then put the file in the path specified by the file in the project.

Note: the file software for generating protobuf and the protobuf file for testing have also been integrated into the project and can be obtained directly.

Once the Java files are generated, we’ll see how to use them. Here I will paste the code directly, and write the comments in the code, should be easier to understand… Code examples:

Note: Because protobuf is transmitted through binary, you need to pay attention to the encoding. Also with protobuf you need to pay attention to the maximum byte length that can be transmitted at a time.

Output result:

id:1name:xuwujingage:18Copy the code

Spring Boot integrates Netty

Note: If you want to get the project directly, you can jump to the bottom and download the project code through the link.

The development of preparation

Environment requirements JDK: : 1.8 Netty: : 4.0 or later (excluding 5) Protobuf: 3.0 or later

First, there are Maven dependencies:

Source code display:

< properties > < project. Build. SourceEncoding > utf-8 < / project. Build. SourceEncoding > < Java version > 1.8 < / Java version > < netty version > 4.1.22. Final < / netty version > < protobuf. Version > 3.5.1 track of < / protobuf version > < springboot > 1.5.9. RELEASE < / springboot > < fastjson > 1.2.41 < / fastjson > < maven.com piler. Source > 1.8 < / maven.com piler source > < maven.com piler target > 1.8 < / maven.com piler. Target > < / properties > < dependencies > < the dependency > <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${springboot}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <version>${springboot}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <version>${springboot}</version>
      <optional>true</optional>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>${netty.version}</version>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>${protobuf.version}</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson}</version> </dependency <dependency> <groupId>junit</groupId> <artifactId> <version>4.12</version> <scope>test</scope>
    </dependency> 
</dependencies>
Copy the code

After adding maven dependencies, there is nothing to add to the configuration file because it is just a listening port for now.

The code

The code module is mainly divided into server and client. The main implementation of the business logic: after the server is successfully started, the client is also successfully started. At this time, the server will send a protobuf message to the client, and then the client responds accordingly. After the connection between the client and the server is successful, the client sends heartbeat instructions to the server every period of time to inform the server that the client is still in existence. If the client does not send messages within the specified time, the server closes the connection with the client. When the client fails to connect to the server, it tries to reconnect at regular intervals until the connection succeeds!

The service side

The first step is to write the server startup class, which is annotated in more detail in the code and won’t be covered here. Note, however, that in the Netty article I wrote earlier, the server is started directly by using the main method, so it is directly new an object. After integrating with SpringBoot, we need to hand Over Netty to SpringBoot to manage, so we use annotations here. The code is as follows:

@Service("nettyServer") public class NettyServer { private static final int port = 9876; Private static EventLoopGroup boss = new NioEventLoopGroup(); Private static EventLoopGroup work = new NioEventLoopGroup(); Private static ServerBootstrap b = new ServerBootstrap(); @Autowired private NettyServerFilter nettyServerFilter; public voidrun() { try { b.group(boss, work); b.channel(NioServerSocketChannel.class); b.childHandler(nettyServerFilter); ChannelFuture f = b.bind(port).sync(); System.out.println("Server started successfully, port is :"+ port); F.channel ().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } Override {override the EventLoopGroup where all resources are released, including the created thread work.shutdownGracefully(); boss.shutdownGracefully(); }}Copy the code

After the server main class is written, we will set the corresponding filtering conditions. You need to inherit the ChannelInitializer class from Netty and override the initChannel method to add corresponding Settings, such as heartbeat timeout Settings, transport protocol Settings, and corresponding service implementation classes. The code is as follows:

@Component public class NettyServerFilter extends ChannelInitializer<SocketChannel> { @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline ph = ch.pipeline(); // Add parameters: read timeout, write timeout, all types of timeout, and time format ph.addLast(new IdleStateHandler(5, 0, 0, timeUnit.seconds)); / / decoding and encoding, should work in harmony with the client / / transport protocol Protobuf ph. AddLast (new ProtobufVarint32FrameDecoder ()); ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance())); ph.addLast(new ProtobufVarint32LengthFieldPrepender()); ph.addLast(new ProtobufEncoder()); // Business logic implementation class ph.addlast ("nettyServerHandler", nettyServerHandler); }}Copy the code

After writing the code for the service-related setup, we will write the main business code. Use Netty writing business layer code, we need to inherit ChannelInboundHandlerAdapter or SimpleChannelInboundHandler class, here by the way the difference between them two. Inheritance SimpleChannelInboundHandler after class, after receiving the data will be automatically release data takes up the Bytebuffer resources. And inheriting the class requires specifying the data format. While inheriting ChannelInboundHandlerAdapter will not automatically release, need to manually call ReferenceCountUtil. Release to release () method. Inheriting the class does not require specifying a data format. So in this case, personal recommendations server-side inherit ChannelInboundHandlerAdapter, manually release, prevent data processed is automatically released. In addition, the server may have multiple clients connected, and the data format requested by each client is not consistent, so it can be processed accordingly. The client can inherit SimpleChannelInboundHandler classes depending on the situation. The advantage is that the format of the data to be transmitted is directly specified, so there is no need for format conversion.

The code is as follows:

@Service("nettyServerHandler") public class NettyServerHandler extends ChannelInboundHandlerAdapter {/ * * free * times/private int idle_count = 1; /** private int count = 1; /** * When the connection is established, */ @override Public void channelActive(ChannelHandlerContext CTX) throws Exception {system.out.println ("Connected client address :" + ctx.channel().remoteAddress());
    UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0) .build(); ctx.writeAndFlush(userMsg); super.channelActive(ctx); } /** * Timeout processing Is triggered if the heartbeat of the client is not received within 5 seconds; If more than two times, it is directly closed; */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {if (obj instanceof IdleStateEvent) {
      IdleStateEvent event = (IdleStateEvent) obj;
      if(idlestate.reader_idle. Equals (event.state())) {// If the read channel is idle, the heartbeat command system.out.println () is not received."I have not received any message from the client for 5 seconds.");
        if (idle_count > 1) {
          System.out.println("Close this inactive channel."); ctx.channel().close(); } idle_count++; }}else{ super.userEventTriggered(ctx, obj); Override public void channelRead(ChannelHandlerContext CTX, Object msg) throws Exception { System.out.println("The first" + count + "Time" + ", the message received by the server :"+ msg); Try {// If the data is protobufif (msg instanceof UserMsg) {
        UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
        if (userState.getState() == 1) {
          System.out.println("Client business processing successful!");
        } else if(userState.getState() == 2){
          System.out.println("Receive heartbeat sent to client!");
        }else{
          System.out.println("Unknown order!"); }}else {
        System.out.println("Unknown data!" + msg);
        return; } } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } count++; } @override public void exceptionCaught(ChannelHandlerContext CTX, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

There is also a server side startup class, which used to boot directly from the main method, but has been changed to boot via springBoot, with little difference.

The code is as follows:

@springBootApplication public class NettyServerApp {public static void main(String[] args) {// Start embedded Tomcat and initialize Context = SpringApplication.run(nettyServerApp. class, args); NettyServer nettyServer = context.getBean(NettyServer.class); nettyServer.run(); }}Copy the code

At this point, the corresponding code on the server side is written.

The client

The code on the client side is similar in many ways to that on the server side, so I won’t go into too much detail, but I’ll briefly describe some of the different code. The first is the client’s main class, which is basically the same as the server, with more listening ports and a listener (to listen for disconnection from the server, for reconnection). The main code logic is as follows:

public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
    ChannelFuture f = null;
    try {
      if(bootstrap ! = null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.handler(nettyClientFilter);
        bootstrap.remoteAddress(host, port);
        f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
          final EventLoop eventLoop = futureListener.channel().eventLoop();
          if(! futureListener.isSuccess()) { System.out.println("Disconnect from the server! Prepare to attempt reconnection after 10 seconds!");
            eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); }});if(initFalg){
          System.out.println("Netty client started successfully!");
          initFalg=false; } // block f.channel().closeFuture().sync(); } } catch (Exception e) { System.out.println("Client connection failed!"+e.getMessage()); }}Copy the code

Note: the implementation of the listener piece is written in JDK1.8.

The client-side filter is basically the same as the server-side filter. However, it is important to note that the transport protocol, encoding and decoding should be the same, and the heartbeat read and write time should be less than the server set time. The changed code is as follows:

ChannelPipeline ph = ch.pipeline(); Read timeout, write timeout, all types of timeout, time format ph.addLast(new IdleStateHandler(0, 4, 0, timeUnit.seconds));Copy the code

Client business code logic. The main logic is that the heartbeat sends on time and parses the protobuf data sent by the service. The Sharable annotation is used to ensure that multiple handlers can be safely shared by multiple channels, i.e. to ensure thread safety. Without further ado, the code is as follows:

@Service("nettyClientHandler") @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; Private int fcount = 1; private int fcount = 1; @override public void channelActive(ChannelHandlerContext CTX) throws Exception {system.out.println ("When establishing connection:"+ new Date()); ctx.fireChannelActive(); } /** * Override public void channelInactive(ChannelHandlerContext CTX) throws Exception {system.out.println ("When closing connection:"+ new Date()); final EventLoop eventLoop = ctx.channel().eventLoop(); nettyClient.doConnect(new Bootstrap(), eventLoop); super.channelInactive(ctx); } /** * Heartbeat request processing sends heartbeat requests every 4 seconds; * */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { System.out.println("Loop request time:" + new Date() + ", times" + fcount);
    if (obj instanceof IdleStateEvent) {
      IdleStateEvent event = (IdleStateEvent) obj;
      if(idlestate.writer_idle.equals (event.state())) {// If the write channel is idle, send the heartbeat command UserMsg.Builder userState = UserMsg.newBuilder().setState(2); ctx.channel().writeAndFlush(userState); fcount++; Override public void channelRead(ChannelHandlerContext CTX, Object MSG) throws Exception {// If data is not of the protobuf typeif(! (msg instanceof UserMsg)) { System.out.println("Unknown data!" + msg);
      return; } try {// Get protobuf data userinfo.usermsg UserMsg = (userinfo.usermsg) MSG; // Do the corresponding business processing... System.out.println(system.out.println ("User information received by the client. Number:" + userMsg.getId() + "Name:" + userMsg.getName() + ",年龄:"+ userMsg.getAge()); Usermsg.builder userState = usermsg.newBuilder ().setState(1); ctx.writeAndFlush(userState); System.out.println("Successfully sent to server!"); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); }}}Copy the code

So we’re done with the client code up here.

A functional test

Start the server first, and then the client. Let’s see if that’s true.

Server output:

Client input result:

The printed information can be seen as described above.

Now let’s see if the client can reconnect. Start the client and then the server.

Client input result:

Server output:

And so it turned out!

Finally:

SpringBoot integration Netty using Protobuf for data transfer ends here. Protobuf data transfer using SpringBoot integration Netty

https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

Also, there are Netty projects that do not use springBoot integration.

https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

Original author: The Void

The original link: http://www.panchengming.com