A PDF of Java, JVM, multithreading, MySQL, Redis, Kafka, Docker, RocketMQ, Nginx, MQ queues, data structures, concurrent programming, concurrent pressure, and SEC kill architecture is available if you need it

preface

It is well known that establishing socket connections is a very performance consuming thing when we are making network connections, especially in the distributed situation, using thread pools to maintain multiple client connections is a very thread consuming behavior. So we should through what technology to solve the above problems, so we have to mention a network connection weapon – Netty.

Netty

Netty is a NIO client server framework:

  • It can quickly and easily develop network applications, such as protocol servers and clients.
  • It greatly simplifies and simplifies network programming, such as TCP and UDP socket servers.
NIO is a non-blocking IO that has the following characteristics

  • A single thread can connect to multiple clients.
  • A selector can manage multiple channels in a single thread, and all new channels must be registered with the selector.
  • A SelectionKey represents the registration relationship between a particular channel object and a particular selector object.
  • A selector performing a select() operation may block, but the blocking time can be set, and the selector can be woken up with wakeup(), so NIO is non-blocking IO.
Netty model Selector mode

Compared with ordinary NIO, it has improved performance by adopting:

  • NIO uses multiple threads to use multiple selectors at the same time
  • By binding multiple ports, a selector can register multiple socketServers at the same time
  • A single thread can have only one selector, which is used to match and reuse channels

Half a pack problem

When TCP/IP sends messages, packets may be unpacked. As a result, the receiver cannot know when the received data is complete. NIO does not block when data cannot be read in traditional BIO. In order to solve the half-packet problem of NIO, Netty proposed a reactor model based on Selector model, so as to solve the problem of incomplete client request on server side.

Netty model REACTOR model

  • Half packet problem is solved on the basis of selector.

Above, simply described as “boss takes the job and lets work do the job “: manReactor is used to receive requests (it shakes hands with clients), while subReactor is used to process requests (it doesn’t connect directly to clients).

SpringBoot uses Netty for remote invocation

Maven rely on

<! -- Lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <optional>true</optional> </dependency> <! --netty--> <dependency> <groupId> io.ty </groupId> <artifactId>netty-all</artifactId> <version> 4.1.17.final </version> </dependency>Copy the code
Server part

Nettyserver.java: Service start listener

@Slf4j public class NettyServer { public void start() { InetSocketAddress socketAddress = new InetSocketAddress (" 127.0.0.1 ", 8082); BossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(200); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()) .localAddress(socketAddress) // Set the queue size. Option (channeloption.so_backlog, 1024) // TCP automatically sends an active probe data packet when there is no data communication within two hours. ChildOption (channeloption.so_keepalive, true); // Bind the port to start receiving incoming connections. Try {ChannelFuture Future = bootstrap.bind(socketAddress).sync(); Log.info (" Server starts listening on port: {}", socketaddress.getPort ()); future.channel().closeFuture().sync(); } catch (InterruptedException e) {log.error(" server startup failed ", e); } the finally {/ / close the main thread group bossGroup. ShutdownGracefully (); / / close working thread group workGroup. ShutdownGracefully (); }}}Copy the code
The initializer ServerChannelInitializer. Java: netty service

Public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {@override Protected void initChannel(SocketChannel SocketChannel) throws Exception {// Add codec socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new NettyServerHandler()); }}Copy the code
Nettyserverhandler. Java: Netty server processor

/ * * * * * / @ Slf4j netty server processor public class NettyServerHandler extends ChannelInboundHandlerAdapter {/ * * * client connection will trigger * / @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Channel active......" ); } /** * Override public void channelRead(ChannelHandlerContext CTX, Object MSG) throws Exception {log.info(" Server received message: {}", msg.toString()); Ctx. write(" Hello to you "); ctx.flush(); } /** * Override public void exceptionCaught(ChannelHandlerContext CTX, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code
Rpcserverapp. Java: SpringBoot boot class

/ * * * * * start class / @ Slf4j @ SpringBootApplication (exclude = {DataSourceAutoConfiguration. Class}) public class RpcServerApp extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(RpcServerApp.class); } @param args */ public static void main(String[] args) {springApplication.run (RpcServerApp. args); // Enable the Netty service NettyServer NettyServer =new NettyServer (); nettyServer.start(); Log.info ("====== service started ========"); }}Copy the code
Client side

Nettyclientutil. Java: NettyClient utility class

/** * @slf4j public class NettyClientUtil {public static ResponseResult helloNetty(String MSG) { NettyClientHandler nettyClientHandler = new NettyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap Bootstrap = new Bootstrap().group(group) Used for instant transmission of small data. Option (channeloption. TCP_NODELAY, true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder", new StringDecoder()); socketChannel.pipeline().addLast("encoder", new StringEncoder()); socketChannel.pipeline().addLast(nettyClientHandler); }}); Try {ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync(); Log.info (" Client sent successfully...." ); // Send message future.channel().writeAndFlush(MSG); Future.channel ().closeFuture().sync(); return nettyClientHandler.getResponseResult(); } catch (Exception e) {log.error(" Netty failed ", e); throw new BusinessException(CouponTypeEnum.OPERATE_ERROR); Getmessage () {getmessage (exceptionexception) {getmessage (exceptionexception) {getmessage (exceptionexception); }}}Copy the code
Nettyclienthandler. Java: client processor

/ * * * * * / client processor @ Slf4j @ Setter @ Getter public class NettyClientHandler extends ChannelInboundHandlerAdapter {private ResponseResult responseResult; @override public void channelActive(ChannelHandlerContext CTX) throws Exception {log.info(" Client Active.....") ); } @override public void channelRead(ChannelHandlerContext CTX, Object MSG) throws Exception {log.info(" The client receives the message: {}", msg.toString()); this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

validation

The test interface

@RestController @Slf4j public class UserController { @PostMapping("/helloNetty") @MethodLogPrint public ResponseResult helloNetty(@RequestParam String msg) { return NettyClientUtil.helloNetty(msg); }}Copy the code
Access test interface

The server prints information

The client prints information