A background.

Recent project: Need to use Android App as server of Socket and a port that can listen on TCP/Web Socket.

Naturally, the project decided to use the Netty framework. After receiving the message from the client, the Netty server can process the service. In some scenarios, the server also needs to send messages to the client App/ web page.

Use of Netty

2.1 Netty Server

First, define NettyServer, which is a singleton declared with Object. Start, stop, and send messages on the Netty server.

object NettyServer {

    private val TAG = "NettyServer"

    private varchannel: Channel? =null
    private lateinit var listener: NettyServerListener<String>
    private lateinit var bossGroup: EventLoopGroup
    private lateinit var workerGroup: EventLoopGroup

    var port = 8888
        set(value)  {
            field = value
        }

    var webSocketPath = "/ws"
        set(value)  {
            field = value
        }

    var isServerStart: Boolean = false
        private set

    fun start(a) {
        object : Thread() {
            override fun run(a) {
                super.run()
                bossGroup = NioEventLoopGroup(1)
                workerGroup = NioEventLoopGroup()
                try {
                    val b = ServerBootstrap()
                    b.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel::class.java)
                            .localAddress(InetSocketAddress(port))
                            .childOption(ChannelOption.SO_KEEPALIVE, true)
                            .childOption(ChannelOption.SO_REUSEADDR, true)
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            .childHandler(NettyServerInitializer(listener,webSocketPath))

                    // Bind and start to accept incoming connections.
                    val f = b.bind().sync()
                    Log.i(TAG, NettyServer::class.java.name+"started and listen on"+f.channel().localAddress())

                    isServerStart = true
                    listener.onStartServer()
                    f.channel().closeFuture().sync()
                } catch (e: Exception) {
                    Log.e(TAG, e.localizedMessage)
                    e.printStackTrace()
                } finally {
                    isServerStart = false
                    listener.onStopServer()

                    disconnect()
                }
            }
        }.start()

    }

    fun disconnect(a) {
        workerGroup.shutdownGracefully()
        bossGroup.shutdownGracefully()
    }

    fun setListener(listener: NettyServerListener<String>) {
        this.listener = listener
    }

    // Send TCP messages asynchronously
    fun sendMsgToClient(data: String, listener: ChannelFutureListener)= channel? .run {val flag = this.isActive

        if (flag) {

            this.writeAndFlush(data + System.getProperty("line.separator")).addListener(listener) } flag } ? :false

    // Send TCP messages synchronously
    fun sendMsgToClient(data: String)= channel? .run {if (this.isActive) {

            return this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly().isSuccess
        }

        false
    } ?: false

    // Send WebSocket messages asynchronously
    fun sendMsgToWS(data: String,listener: ChannelFutureListener)= channel? .run {val flag = this.isActive

        if (flag) {

            this.writeAndFlush(TextWebSocketFrame(data)).addListener(listener) } flag } ? :false

    // Send TCP messages synchronously
    fun sendMsgToWS(data: String)= channel? .run {if (this.isActive) {

            return this.writeAndFlush(TextWebSocketFrame(data)).awaitUninterruptibly().isSuccess
        }

        false
    } ?: false

    /** * Switch channel * Set the server, which client to communicate with *@param channel
     */
    fun selectorChannel(channel: Channel?). {
        this.channel = channel
    }
}
Copy the code

NettyServerInitializer is the childHandler used by the server after it connects to the client:

class NettyServerInitializer(private val mListener: NettyServerListener<String>,private val webSocketPath:String) : ChannelInitializer<SocketChannel>() {

    @Throws(Exception::class)
    public override fun initChannel(ch: SocketChannel) {

        val pipeline = ch.pipeline()

        pipeline.addLast("active",ChannelActiveHandler(mListener))
        pipeline.addLast("socketChoose", SocketChooseHandler(webSocketPath))

        pipeline.addLast("string_encoder",StringEncoder(CharsetUtil.UTF_8))
        pipeline.addLast("linebased",LineBasedFrameDecoder(1024))
        pipeline.addLast("string_decoder",StringDecoder(CharsetUtil.UTF_8))
        pipeline.addLast("commonhandler", CustomerServerHandler(mListener))
    }
}
Copy the code

NettyServerInitializer contains multiple handlers: ChannelActiveHandler used for connections, SocketChooseHandler used for protocol selection, StringEncoder, LineBasedFrameDecoder, StringDecoder used for TCP messages, And CustomerServerHandler, which ultimately processes the message.

ChannelActiveHandler:

@ChannelHandler.Sharable
class ChannelActiveHandler(var mListener: NettyServerListener<String>) : ChannelInboundHandlerAdapter() {

    @Throws(Exception::class)
    override fun channelActive(ctx: ChannelHandlerContext) {

        val insocket = ctx.channel().remoteAddress() as InetSocketAddress
        val clientIP = insocket.address.hostAddress
        val clientPort = insocket.port

        Log.i("ChannelActiveHandler"."New connection:$clientIP : $clientPort")
        mListener.onChannelConnect(ctx.channel())
    }

}
Copy the code

SocketChooseHandler reads messages to distinguish between websockets and sockets. If the Socket is a WebSocket, remove the Handler used by the Socket.

class SocketChooseHandler(val webSocketPath:String) : ByteToMessageDecoder() {

    @Throws(Exception::class)
    override fun decode(ctx: ChannelHandlerContext, `in` :ByteBuf.out: List<Any>) {
        val protocol = getBufStart(`in`)
        if (protocol.startsWith(WEBSOCKET_PREFIX)) {
            PipelineAdd.websocketAdd(ctx,webSocketPath)

            ctx.pipeline().remove("string_encoder")
            ctx.pipeline().remove("linebased")
            ctx.pipeline().remove("string_decoder")} `in`.resetReaderIndex()
        ctx.pipeline().remove(this.javaClass)
    }

    private fun getBufStart(`in` :ByteBuf): String {
        var length = `in`.readableBytes()
        if (length > MAX_LENGTH) {
            length = MAX_LENGTH
        }

        // mark the read position
        `in`.markReaderIndex()
        val content = ByteArray(length)
        `in`.readBytes(content)
        return String(content)
    }

    companion object {
        /** The default password is 23 */
        private val MAX_LENGTH = 23
        /** WebSocket handshake protocol prefix */
        private val WEBSOCKET_PREFIX = "GET /"}}Copy the code

StringEncoder, LineBasedFrameDecoder, and StringDecoder are Netty’s built-in codecs and decoders. LineBasedFrameDecoder is used to solve the problem of TCP sticky/unpack.

CustomerServerHandler:

@ChannelHandler.Sharable
class CustomerServerHandler(private val mListener: NettyServerListener<String>) : SimpleChannelInboundHandler<Any>() {

    @Throws(Exception::class)
    override fun channelReadComplete(ctx: ChannelHandlerContext){}override fun exceptionCaught(ctx: ChannelHandlerContext,
                                 cause: Throwable) {
        cause.printStackTrace()
        ctx.close()
    }

    @Throws(Exception::class)
    override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {

        val buff = msg as ByteBuf
        val info = buff.toString(CharsetUtil.UTF_8)
        Log.d(TAG,"Received message:$info")}@Throws(Exception::class)
    override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {

        if (msg is WebSocketFrame) {  // Process WebSocket messages

            val webSocketInfo = (msg as TextWebSocketFrame).text().trim { it <= ' ' }

            Log.d(TAG, "Received a WebSocketSocket message:$webSocketInfo")

            mListener.onMessageResponseServer(webSocketInfo , ctx.channel().id().asShortText())
        } else if (msg is String){   // Process Socket messages

            Log.d(TAG, "Received a socket message:$msg")

            mListener.onMessageResponseServer(msg, ctx.channel().id().asShortText())
        }
    }

    // Disconnect the connection
    @Throws(Exception::class)
    override fun channelInactive(ctx: ChannelHandlerContext) {
        super.channelInactive(ctx)
        Log.d(TAG, "channelInactive")

        val reAddr = ctx.channel().remoteAddress() as InetSocketAddress
        val clientIP = reAddr.address.hostAddress
        val clientPort = reAddr.port

        Log.d(TAG,"Disconnection:$clientIP : $clientPort")

        mListener.onChannelDisConnect(ctx.channel())
    }

    companion object {

        private val TAG = "CustomerServerHandler"}}Copy the code

2.2 Netty Clients

The client also needs a NettyTcpClient to start, close, and send messages, and NettyTcpClient is created using Builder mode.

class NettyTcpClient private constructor(val host: String, val tcp_port: Int.val index: Int) {

    private lateinit var group: EventLoopGroup

    private lateinit var listener: NettyClientListener<String>

    private var channel: Channel? = null

    /** * Get TCP connection status **@returnGet TCP connection status */
    var connectStatus = false

    /** * Maximum number of reconnections */
    var maxConnectTimes = Integer.MAX_VALUE
        private set

    private var reconnectNum = maxConnectTimes

    private var isNeedReconnect = true

    var isConnecting = false
        private set

    var reconnectIntervalTime: Long = 5000
        private set

    /** * Heartbeat interval */
    var heartBeatInterval: Long = 5
        private set/ / unit of seconds

    /** * Whether to send heartbeat */
    var isSendheartBeat = false
        private set

    /** * Heartbeat data. The value can be String or byte[]. */
    private var heartBeatData: Any? = null

    fun connect(a) {
        if (isConnecting) {
            return
        }

        val clientThread = object : Thread("Netty-Client") {
            override fun run(a) {
                super.run()
                isNeedReconnect = true
                reconnectNum = maxConnectTimes
                connectServer()
            }
        }
        clientThread.start()
    }


    private fun connectServer(a) {

        synchronized(this@NettyTcpClient) {

            varchannelFuture: ChannelFuture? =null

            if(! connectStatus) { isConnecting =true
                group = NioEventLoopGroup()
                val bootstrap = Bootstrap().group(group)
                        .option(ChannelOption.TCP_NODELAY, true)// Mask the Nagle algorithm attempt
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                        .channel(NioSocketChannel::class.java as Class<out Channel>?
                        .handler(object : ChannelInitializer<SocketChannel>() {

                            @Throws(Exception::class)
                            public override fun initChannel(ch: SocketChannel) {

                                if (isSendheartBeat) {
                                    ch.pipeline().addLast("ping", IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS)) // call userEventTriggered when no data is sent
                                }

                                ch.pipeline().addLast(StringEncoder(CharsetUtil.UTF_8))
                                ch.pipeline().addLast(StringDecoder(CharsetUtil.UTF_8))
                                ch.pipeline().addLast(LineBasedFrameDecoder(1024))// Sticky packet processing requires client and server cooperation
                                ch.pipeline().addLast(NettyClientHandler(listener, index, isSendheartBeat, heartBeatData))
                            }
                        })

                try {
                    channelFuture = bootstrap.connect(host, tcp_port).addListener {
                        if (it.isSuccess) {
                            Log.d(TAG, "Connection successful")
                            reconnectNum = maxConnectTimes
                            connectStatus = truechannel = channelFuture? .channel() }else {
                            Log.d(TAG, "Connection failed")
                            connectStatus = false
                        }
                        isConnecting = false
                    }.sync()

                    // Wait until the connection is closed.
                    channelFuture.channel().closeFuture().sync()
                    Log.d(TAG, "Disconnect")}catch (e: Exception) {
                    e.printStackTrace()
                } finally {
                    connectStatus = false
                    listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, index)

                    if(channelFuture ! =null) {
                        if(channelFuture.channel() ! =null && channelFuture.channel().isOpen) {
                            channelFuture.channel().close()
                        }
                    }
                    group.shutdownGracefully()
                    reconnect()
                }
            }
        }
    }


    fun disconnect(a) {
        Log.d(TAG, "disconnect")
        isNeedReconnect = false
        group.shutdownGracefully()
    }

    fun reconnect(a) {
        Log.d(TAG, "reconnect")
        if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
            reconnectNum--
            SystemClock.sleep(reconnectIntervalTime)
            if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
                Log.e(TAG, "Reconnect")
                connectServer()
            }
        }
    }

    /** * send ** asynchronously@paramData Indicates the data to be sent@paramListener Sends the result callback *@returnMethod result */
    fun sendMsgToServer(data: String, listener: MessageStateListener)= channel? .run {val flag = this! =null && connectStatus

        if (flag) {

            this.writeAndFlush(data + System.getProperty("line.separator")).addListener { channelFuture -> listener.isSendSuccss(channelFuture.isSuccess) } } flag } ? :false

    /** * synchronously sends **@paramData Indicates the data to be sent@returnMethod result */
    fun sendMsgToServer(data: String)= channel? .run {val flag = this! =null && connectStatus

        if (flag) {

            val channelFuture = this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly()
            return channelFuture.isSuccess
        }

        false}? :false

    fun setListener(listener: NettyClientListener<String>) {
        this.listener = listener
    }

    /** * Create NettyTcpClient */
    class Builder {

        /** * Maximum number of reconnections */
        private var MAX_CONNECT_TIMES = Integer.MAX_VALUE

        /**
         * 重连间隔
         */
        private var reconnectIntervalTime: Long = 5000
        /** * Server address */
        private var host: String? = null
        /** * Server port */
        private var tcp_port: Int = 0
        /** * Client id, (because there may be multiple connections) */
        private var mIndex: Int = 0

        /** * Whether to send heartbeat */
        private var isSendheartBeat: Boolean = false
        /** * Heartbeat interval */
        private var heartBeatInterval: Long = 5

        /** * Heartbeat data. The value can be String or byte[]. */
        private var heartBeatData: Any? = null


        fun setMaxReconnectTimes(reConnectTimes: Int): Builder {
            this.MAX_CONNECT_TIMES = reConnectTimes
            return this
        }


        fun setReconnectIntervalTime(reconnectIntervalTime: Long): Builder {
            this.reconnectIntervalTime = reconnectIntervalTime
            return this
        }


        fun setHost(host: String): Builder {
            this.host = host
            return this
        }

        fun setTcpPort(tcp_port: Int): Builder {
            this.tcp_port = tcp_port
            return this
        }

        fun setIndex(mIndex: Int): Builder {
            this.mIndex = mIndex
            return this
        }

        fun setHeartBeatInterval(intervalTime: Long): Builder {
            this.heartBeatInterval = intervalTime
            return this
        }

        fun setSendheartBeat(isSendheartBeat: Boolean): Builder {
            this.isSendheartBeat = isSendheartBeat
            return this
        }

        fun setHeartBeatData(heartBeatData: Any): Builder {
            this.heartBeatData = heartBeatData
            return this
        }

        fun build(a): NettyTcpClient {
            valnettyTcpClient = NettyTcpClient(host!! , tcp_port, mIndex) nettyTcpClient.maxConnectTimes =this.MAX_CONNECT_TIMES
            nettyTcpClient.reconnectIntervalTime = this.reconnectIntervalTime
            nettyTcpClient.heartBeatInterval = this.heartBeatInterval
            nettyTcpClient.isSendheartBeat = this.isSendheartBeat
            nettyTcpClient.heartBeatData = this.heartBeatData
            return nettyTcpClient
        }
    }

    companion object {
        private val TAG = "NettyTcpClient"
        private val CONNECT_TIMEOUT_MILLIS = 5000}}Copy the code

The Android client is relatively simple, requiring handlers that include: Support IdleStateHandler for heartbeat, the Handler used for TCP messages (StringEncoder, StringDecoder, LineBasedFrameDecoder), And NettyClientHandler for processing incoming TCP messages.

NettyClientHandler:

class NettyClientHandler(private val listener: NettyClientListener<String>, private val index: Int.private val isSendheartBeat: Boolean.private val heartBeatData: Any?) : SimpleChannelInboundHandler<String>() {

    /** ** sets IdleStateHandler heartbeat detection to perform a read detection every x seconds. * The userEventTrigger() method is triggered if the ChannelRead() method is not called within x seconds@param ctx ChannelHandlerContext
     * @param evt IdleStateEvent
     */
    override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {

        if (evt is IdleStateEvent) {
            if (evt.state() == IdleState.WRITER_IDLE) {   // Send heartbeat

                if (isSendheartBeat) {
                    if (heartBeatData == null) {

                        ctx.channel().writeAndFlush("Heartbeat" + System.getProperty("line.separator")!! }else {

                        if (heartBeatData is String) {
                            Log.d(TAG, "userEventTriggered: String")
                            ctx.channel().writeAndFlush(heartBeatData + System.getProperty("line.separator")!! }else if (heartBeatData is ByteArray) {
                            Log.d(TAG, "userEventTriggered: byte")
                            val buf = Unpooled.copiedBuffer((heartBeatData asByteArray?) !!) ctx.channel().writeAndFlush(buf) }else {

                            Log.d(TAG, "userEventTriggered: heartBeatData type error")}}}else {
                    Log.d(TAG, "No heartbeat.")}}}}/** ** Client online **@param ctx ChannelHandlerContext
     */
    override fun channelActive(ctx: ChannelHandlerContext) {

        Log.d(TAG, "channelActive")
        listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, index)
    }

    /** ** Client offline **@param ctx ChannelHandlerContext
     */
    override fun channelInactive(ctx: ChannelHandlerContext) {

        Log.d(TAG, "channelInactive")}/** * The client received the message **@param channelHandlerContext ChannelHandlerContext
     * @paramMSG news * /
    override fun channelRead0(channelHandlerContext: ChannelHandlerContext, msg: String) {

        Log.d(TAG, "channelRead0:")
        listener.onMessageResponseClient(msg, index)
    }

    / * * *@param ctx   ChannelHandlerContext
     * @paramCause abnormal * /
    override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {

        Log.e(TAG, "exceptionCaught")
        listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, index)
        cause.printStackTrace()
        ctx.close()
    }

    companion object {

        private val TAG = "NettyClientHandler"}}Copy the code

Three. Implementation of Demo

3.1 Socket Server

Start the NettyServer:

    private fun startServer(a) {

        if(! NettyServer.isServerStart) { NettyServer.setListener(this@MainActivity)
            NettyServer.port = port
            NettyServer.webSocketPath = webSocketPath
            NettyServer.start()
        } else {
            NettyServer.disconnect()
        }
    }
Copy the code

NettyServer sends TCP messages asynchronously:

NettyServer.sendMsgToClient(msg, ChannelFutureListener { channelFuture ->

        if (channelFuture.isSuccess) {

            msgSend(msg)
      } 
})
Copy the code

NettyServer asynchronously sends WebSocket messages:

NettyServer.sendMsgToWS(msg, ChannelFutureListener { channelFuture ->

        if (channelFuture.isSuccess) {

              msgSend(msg)
      } 
 })
Copy the code

Demo can use startServer to start the Socket server, or click configServer to change the port of the server and the Endpoint of WebSocket before starting.

3.2 Socket Client

NettyTcpClient is created in Builder mode:

            mNettyTcpClient = NettyTcpClient.Builder()
                    .setHost(ip)                    // Set the server address
                    .setTcpPort(port)               // Set the server port number
                    .setMaxReconnectTimes(5)        // Set the maximum number of reconnections
                    .setReconnectIntervalTime(5)    // Set the reconnection interval. Unit: second
                    .setSendheartBeat(false)        // Set to send heartbeat
                    .setHeartBeatInterval(5)        // Set the heartbeat interval. Unit: second
                    .setHeartBeatData("I'm is HeartBeatData") // Set heartbeat data. The value can be String or byte[], whichever is later
                    .setIndex(0)                    // Set the client identity (because there may be multiple TCP connections)
                    .build()

            mNettyTcpClient.setListener(this@MainActivity) // Set TCP listening
Copy the code

Start and close client connection:

    private fun connect(a) {
        Log.d(TAG, "connect")
        if(! mNettyTcpClient.connectStatus) { mNettyTcpClient.connect()// Connect to the server
        } else {
            mNettyTcpClient.disconnect()
        }
    }
Copy the code

NettyTcpClient asynchronously sends TCP messages to the server:

mNettyTcpClient.sendMsgToServer(msg, object : MessageStateListener {
         override fun isSendSuccss(isSuccess: Boolean) {
                if (isSuccess) {
                           
                   msgSend(msg)
               } 
         }
})
Copy the code

The Demo client App can also click configClient to change the server IP address and port to be connected before startup.

WebSocket tests are available at: www.websocket-test.com/

Netty Server communicates with web pages:

WebSocket Online test:

4. To summarize

With the help of Kotlin’s features and Netty framework, we also implemented a Socket server on Android.

Demo github address: github.com/fengzhizi71…

The example in this article is simple, just sending a simple message. In a real production environment, the message format would probably be JSON, because JSON is more flexible, parsing JSON to get the content of the message.

References:

  1. Github.com/aLittleGree…
  2. Netty implements a port that receives both socket and webSocket connections

Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.