Netty

Netty is a client/server framework that leverages Java’s advanced networking capabilities to provide an easy-to-use API while hiding the complexity behind it. Netty is a widely used Java network programming framework (Netty won Duke’s Choice Award in 2011, see www.java.net/dukeschoice… Facebook and Instagram, as well as popular open source projects such as Infinispan, HornetQ, vert. x, Apache Cassandra and Elasticsearch, take advantage of its powerful core code for web abstraction.

Depend on the introduction of

I found that I could not find a class when USING the latest version. I checked later because of the JDK version. In Android, the JDK version is too high to support, so I looked for the release of 19 years, and the test was OK.

Implementation 'io.net ty: netty - all: 4.1.42. The Final'Copy the code

Server-side code implementation

NettyServer

@Slf4j
public class NettyServer {

    public void start(InetSocketAddress socketAddress) {
        //new a main thread group
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // New a worker thread group
        EventLoopGroup workGroup = new NioEventLoopGroup(200);
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer())
                .localAddress(socketAddress)
                // Set the queue size
                .option(ChannelOption.SO_BACKLOG, 1024)
                // If there is no data communication within two hours,TCP automatically sends an active probe data packet
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        // Bind the port to start receiving incoming connections
        try {
            ChannelFuture future = bootstrap.bind(socketAddress).sync();
            log.info("Server starts listening on port: {}", socketAddress.getPort());
// future.channel().writeAndFlush(" Hello ");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Close the main thread group
            bossGroup.shutdownGracefully();
            // Close the worker thread groupworkGroup.shutdownGracefully(); }}}Copy the code

ServerChannelInitializer

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // Add codec
        socketChannel.pipeline().addLast("decoder".new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder".new StringEncoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast(newNettyServerHandler()); }}Copy the code

NettyServerHandler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /** * The client connection will trigger */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel active......");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel Inactive......");
    }

    /** ** The client sends a message
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //----------- only changed here -----------
        log.info("Server received message 1111: {}", msg.toString());

        ctx.write("{\" data \ ": {\" taskData \ ": {\" collectionRule \ ": {\ \" id ": 1, \" name \ ": \" IP host "interactive user \", \ "\" rule: \ "[{\ \ \" label \ \ \ ": \ \ \" trill, \ \ \ ", \ \ \ "key \ \ \" : \ \ \ "dyId \ \ \", \ \ \ "type \ \ \" : \ \ \ "string \ \ \"}, {\ \ \ "key \ \ \" : \ \ \ "the count \ \ \", \ \ \ "label \ \ \" : \ \ \ "\ \ \", \ \ \ "type \ \ \" : \ \ \ "string \ \ \"}] \ "and \" ruleType \ ": \" collect \ "and \" source \ ", \ "collectLiveAudience \}, \" description "\", \ "fans list - FuPeng financial world 3 \" and \ "ruleId \":\"1\",\"ruleParam\":\"{\\\"dyId\\\":\\\"ghsys\\\",\\\"count\\\":\\\"140000\\\"}\"},\"taskId\":\"64\",\"taskType\":\"c ollection\"},\"devicesId\":\"5011bbdcd5006a93\",\"type\":\"task\"}");
        ctx.flush();
    }

    /** * An exception is triggered */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Copy the code

Start the service

@SpringBootApplication
public class ServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServerApplication.class, args);
        // Start the server
        NettyServer nettyServer = new NettyServer();
        nettyServer.start(new InetSocketAddress("192.18.52.95".8091)); }}Copy the code

Client code implementation

In fact, the use of Netty client and server side are almost the same overall, so only the core code is listed here.

Initialization operation

abstract class McnNettyTask : Runnable {

    private var socketChannel: SocketChannel? = null
    private var isConnected = false

    override fun run(a) {
        createConnection()
    }

    private fun createConnection(a) {
        val nioEventLoopGroup = NioEventLoopGroup()
        val bootstrap = Bootstrap()
        bootstrap
            .group(nioEventLoopGroup)
            .option(ChannelOption.TCP_NODELAY, true)  / / without blocking
            .channel(NioSocketChannel::class.java)
            .option(ChannelOption.SO_KEEPALIVE, true) / / long connection
            .option(ChannelOption.SO_TIMEOUT, 30 _000) // Sending and receiving timed out
            .handler(McnClientInitializer(object : McnClientListener {
                override fun disConnected(a) {
                    isConnected = false
                }

                override fun connected(a) {
                    isConnected = true}},object : McnEventListener {
                override fun onReceiverMessage(messageRequest: MessageRequest) {
                    dispatchMessage(messageRequest)
                }
            }))
        try {
            val channelFuture = bootstrap.connect(McnNettyConfig.ip, McnNettyConfig.port)
                .addListener(object : ChannelFutureListener {
                    override fun operationComplete(future: ChannelFuture) {
                        if (future.isSuccess) {
                            socketChannel = future.channel() as SocketChannel;
                            isConnected = true
                            CommonConsole.log("netty connect success (ip: ${McnNettyConfig.ip}, port: ${McnNettyConfig.port})")

                            sendMsg(MessageRequest.createDevicesState(0))}else {
                            CommonConsole.log("netty connect failure (ip: ${McnNettyConfig.ip}, port: ${McnNettyConfig.port})")
                            isConnected = false
                            future.channel().close()
                            nioEventLoopGroup.shutdownGracefully()
                        }
                    }
                }).sync()// block until the connection is complete
            channelFuture.channel().closeFuture().sync()
        } catch (ex: Exception) {
            ex.printStackTrace()
        } finally {
            // Release all resources and created threads
            nioEventLoopGroup.shutdownGracefully()
        }
    }

    fun isConnected(a): Boolean {
        return isConnected
    }

    fun disConnected(a){ socketChannel? .close() }abstract fun dispatchMessage(messageRequest: MessageRequest)

    fun sendMsg(msg: String, nettyMessageListener: McnMessageListener? = null) {
        if(! isConnected()) { nettyMessageListener? .sendFailure()return} socketChannel? .run { writeAndFlush(msg +"# # #").addListener { future ->
                if (future.isSuccess) {
                    // Message sent successfully
                    CommonConsole.log("netty send message success (message: $msg") nettyMessageListener? .sendSuccess() }else {
                    // Message sending failed
                    CommonConsole.log("netty send message failure (message: $msg") nettyMessageListener? .sendFailure() } } } } }Copy the code

The handler and Initializer were loaded

class McnClientInitializer(
    private val nettyClientListener: McnClientListener.private val nettyEventListener: McnEventListener
) :
    ChannelInitializer<SocketChannel> (){

    override fun initChannel(socketChannel: SocketChannel) {
        val pipeline = socketChannel.pipeline()
// pipeline.addLast("decoder", McnStringDecoder())
// pipeline.addLast("encoder", McnStringEncoder())
// pipeline.addLast(LineBasedFrameDecoder(1024))
        pipeline.addLast("decoder", StringDecoder())
// pipeline.addLast("encoder", StringEncoder())
        pipeline.addLast(DelimiterBasedFrameEncoder("# # #"))
        pipeline.addLast(McnClientHandler(nettyClientListener, nettyEventListener))
    }
}
Copy the code

Core data receiving and processing handler

class McnClientHandler(
    private val nettyClientListener: McnClientListener.private val nettyEventListener: McnEventListener
) :
    SimpleChannelInboundHandler<String> (){

    override fun channelActive(ctx: ChannelHandlerContext?) {
        super.channelActive(ctx)
        CommonConsole.log("Netty channelActive.........")
        nettyClientListener.connected()
    }

    override fun channelInactive(ctx: ChannelHandlerContext?) {
        super.channelInactive(ctx)
        nettyClientListener.disConnected()
    }

    override fun channelReadComplete(ctx: ChannelHandlerContext?) {
        super.channelReadComplete(ctx)
        CommonConsole.log("Netty channelReadComplete.........")}override fun exceptionCaught(ctx: ChannelHandlerContext? , cause: Throwable?) {
        super.exceptionCaught(ctx, cause)
        CommonConsole.log("Netty exceptionCaught......... ${cause? .message}") cause? .printStackTrace() ctx? .close() }override fun channelRead0(ctx: ChannelHandlerContext? , msg: String?) {
        CommonConsole.log("Netty channelRead......... ${msg}") msg? .run {try {
                val messageRequest =
                    Gson().fromJson<MessageRequest>(msg, MessageRequest::class.java)
                nettyEventListener.onReceiverMessage(messageRequest)
            } catch (ex: Exception) {
                ex.printStackTrace()
            }
            ReferenceCountUtil.release(msg)
        }
    }
}
Copy the code

Deal with data sticking & data subcontracting

If you use Netty, you are bound to run into problems with sticky packets and data subcontracting. The so-called data sticky packet is when the amount of data is relatively small, multiple sent data within a similar time will be received and parsed as a packet. Data subcontracting is the process of dividing a large packet into many smaller packets. Whether the data is subcontracted or pasted, we cannot simply use the data when we use it. Therefore, we need to process the pasted and pasted data so that the data sent each time is independent and complete.

For data encoding, using a custom parser is equivalent to concatenating and negating the data with a specific string.

Server:

ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
        Unpooled.wrappedBuffer(delimiter.getBytes())));
// Convert delimited byte data to string data
ch.pipeline().addLast(new StringDecoder());
// This is a custom encoder that adds a delimiter at the end of the response data
ch.pipeline().addLast(new DelimiterBasedFrameEncoder("# # #"));
// The handler that finally processes the data and returns the response
ch.pipeline().addLast(new EchoServerHandler());
Copy the code

Client:

/ Messages returned by the server are separated by _$, and the maximum size of each lookup is1024Byte ch. Pipeline (.) addLast (new DelimiterBasedFrameDecoder (1024, 
     Unpooled.wrappedBuffer(delimiter.getBytes())));
 // Convert the delimited byte data to a string
 ch.pipeline().addLast(new StringDecoder());
 // Encode the data sent by the client. In this case, add a delimiter at the end of the data sent by the client
 ch.pipeline().addLast(new DelimiterBasedFrameEncoder("# # #"));
 // The client sends data to the server and processes the data from the server
 ch.pipeline().addLast(new EchoClientHandler());
Copy the code

DelimiterBasedFrameEncoder

public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {

    private String delimiter;

    public DelimiterBasedFrameEncoder(String delimiter) {
        this.delimiter = delimiter;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
            throws Exception {
        // Add a delimiter after the data in the responsectx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes())); }}Copy the code

If it is

Add [to client-server communicationSecret code]

During data communication between the client and server, a cipher is added to ensure data integrity and security. In fact, the real communication structure of the two ends becomes as follows:

Complete data = code bytes + real data contentCopy the code

After the client and server get the data, they remove the code data according to the convention, and all that is left is the official data.

For the processing of the cipher, MessageToMessageEncoder can be used to realize the data processing through the encoding and decoding of the obtained communication bytecode.

McnStringEncoder

/** * copy StringEncoder source code, modify, add business processing code */
class McnStringEncoder : MessageToMessageEncoder<CharSequence> {

    var charset: Charset? = null

    constructor(charset: Charset?) {
        if (charset == null) {
            throw NullPointerException("charset")}else {
            this.charset = charset
        }
    }

    constructor() : this(Charset.defaultCharset())

    override fun encode(ctx: ChannelHandlerContext? , msg:CharSequence? .out: MutableList<Any>? {
        if(msg? .isNotEmpty() ==true) {
            out? .add( ByteBufUtil.encodeString( ctx!! .alloc(), CharBuffer.wrap(McnNettyConfig.private_key + msg), charset ) ) } } }Copy the code

McnStringDecoder

/** * copy from StringDecoder source code, modify, add a business handling secret */
class McnStringDecoder : MessageToMessageDecoder<ByteBuf> {
    var charset: Charset? = null

    constructor(charset: Charset?) {
        if (charset == null) {
            throw NullPointerException("charset")}else {
            this.charset = charset
        }
    }

    constructor() : this(Charset.defaultCharset())

    override fun decode(ctx: ChannelHandlerContext? , msg:ByteBuf? .out: MutableList<Any>?{ msg? .run { Log.e("info"."Decoder results = = = = >${msg.toString(charset)}")
            // Check whether the length of the packet is valid
            if (msg.readableBytes() <= McnNettyConfig.keyLength) {
                out? .add(ErrorData.creator(ErrorData.LENGTH_ERROR,"Packet length verification failed"))
                return
            }
            val privateKey = this.readBytes(McnNettyConfig.keyLength)
            // Check whether the password of the packet matches
            if(privateKey.toString(charset) ! = McnNettyConfig.private_key) {out? .add(ErrorData.creator(ErrorData.PRIVATE_KEY_ERROR,"Message cipher verification failed"))
                return
            }
            // Obtain the actual packet content
            out? .add(this.toString(charset))
        }
    }

    data class ErrorData(
        var errorCode: Int.var errorMsg: String
    ) {

        companion object {

            // The length is abnormal
            const val LENGTH_ERROR = -10001

            // Packet verification failed
            const val PRIVATE_KEY_ERROR = -10002

            @JvmStatic
            fun creator(errorCode: Int, message: String): String {
                val errorData = ErrorData(errorCode, message)
                return JSON.toJSONString(errorData)
            }
        }
    }
}
Copy the code

The final use is simple. We just need to add our processing to the processing chain.

val pipeline = socketChannel.pipeline()
pipeline.addLast("decoder", McnStringDecoder())
pipeline.addLast("encoder", McnStringEncoder())
Copy the code

reference

www.cnblogs.com/AIPAOJIAO/p…