A background.
Recent project: Need to use Android App as server of Socket and a port that can listen on TCP/Web Socket.
Naturally, the project decided to use the Netty framework. After receiving the message from the client, the Netty server can process the service. In some scenarios, the server also needs to send messages to the client App/ web page.
Use of Netty
2.1 Netty Server
First, define NettyServer, which is a singleton declared with Object. Start, stop, and send messages on the Netty server.
object NettyServer {
private val TAG = "NettyServer"
private varchannel: Channel? =null
private lateinit var listener: NettyServerListener<String>
private lateinit var bossGroup: EventLoopGroup
private lateinit var workerGroup: EventLoopGroup
var port = 8888
set(value) {
field = value
}
var webSocketPath = "/ws"
set(value) {
field = value
}
var isServerStart: Boolean = false
private set
fun start(a) {
object : Thread() {
override fun run(a) {
super.run()
bossGroup = NioEventLoopGroup(1)
workerGroup = NioEventLoopGroup()
try {
val b = ServerBootstrap()
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.localAddress(InetSocketAddress(port))
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(NettyServerInitializer(listener,webSocketPath))
// Bind and start to accept incoming connections.
val f = b.bind().sync()
Log.i(TAG, NettyServer::class.java.name+"started and listen on"+f.channel().localAddress())
isServerStart = true
listener.onStartServer()
f.channel().closeFuture().sync()
} catch (e: Exception) {
Log.e(TAG, e.localizedMessage)
e.printStackTrace()
} finally {
isServerStart = false
listener.onStopServer()
disconnect()
}
}
}.start()
}
fun disconnect(a) {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
fun setListener(listener: NettyServerListener<String>) {
this.listener = listener
}
// Send TCP messages asynchronously
fun sendMsgToClient(data: String, listener: ChannelFutureListener)= channel? .run {val flag = this.isActive
if (flag) {
this.writeAndFlush(data + System.getProperty("line.separator")).addListener(listener) } flag } ? :false
// Send TCP messages synchronously
fun sendMsgToClient(data: String)= channel? .run {if (this.isActive) {
return this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly().isSuccess
}
false
} ?: false
// Send WebSocket messages asynchronously
fun sendMsgToWS(data: String,listener: ChannelFutureListener)= channel? .run {val flag = this.isActive
if (flag) {
this.writeAndFlush(TextWebSocketFrame(data)).addListener(listener) } flag } ? :false
// Send TCP messages synchronously
fun sendMsgToWS(data: String)= channel? .run {if (this.isActive) {
return this.writeAndFlush(TextWebSocketFrame(data)).awaitUninterruptibly().isSuccess
}
false
} ?: false
/** * Switch channel * Set the server, which client to communicate with *@param channel
*/
fun selectorChannel(channel: Channel?). {
this.channel = channel
}
}
Copy the code
NettyServerInitializer is the childHandler used by the server after it connects to the client:
class NettyServerInitializer(private val mListener: NettyServerListener<String>,private val webSocketPath:String) : ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
public override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
pipeline.addLast("active",ChannelActiveHandler(mListener))
pipeline.addLast("socketChoose", SocketChooseHandler(webSocketPath))
pipeline.addLast("string_encoder",StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("linebased",LineBasedFrameDecoder(1024))
pipeline.addLast("string_decoder",StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("commonhandler", CustomerServerHandler(mListener))
}
}
Copy the code
NettyServerInitializer contains multiple handlers: ChannelActiveHandler used for connections, SocketChooseHandler used for protocol selection, StringEncoder, LineBasedFrameDecoder, StringDecoder used for TCP messages, And CustomerServerHandler, which ultimately processes the message.
ChannelActiveHandler:
@ChannelHandler.Sharable
class ChannelActiveHandler(var mListener: NettyServerListener<String>) : ChannelInboundHandlerAdapter() {
@Throws(Exception::class)
override fun channelActive(ctx: ChannelHandlerContext) {
val insocket = ctx.channel().remoteAddress() as InetSocketAddress
val clientIP = insocket.address.hostAddress
val clientPort = insocket.port
Log.i("ChannelActiveHandler"."New connection:$clientIP : $clientPort")
mListener.onChannelConnect(ctx.channel())
}
}
Copy the code
SocketChooseHandler reads messages to distinguish between websockets and sockets. If the Socket is a WebSocket, remove the Handler used by the Socket.
class SocketChooseHandler(val webSocketPath:String) : ByteToMessageDecoder() {
@Throws(Exception::class)
override fun decode(ctx: ChannelHandlerContext, `in` :ByteBuf.out: List<Any>) {
val protocol = getBufStart(`in`)
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
PipelineAdd.websocketAdd(ctx,webSocketPath)
ctx.pipeline().remove("string_encoder")
ctx.pipeline().remove("linebased")
ctx.pipeline().remove("string_decoder")} `in`.resetReaderIndex()
ctx.pipeline().remove(this.javaClass)
}
private fun getBufStart(`in` :ByteBuf): String {
var length = `in`.readableBytes()
if (length > MAX_LENGTH) {
length = MAX_LENGTH
}
// mark the read position
`in`.markReaderIndex()
val content = ByteArray(length)
`in`.readBytes(content)
return String(content)
}
companion object {
/** The default password is 23 */
private val MAX_LENGTH = 23
/** WebSocket handshake protocol prefix */
private val WEBSOCKET_PREFIX = "GET /"}}Copy the code
StringEncoder, LineBasedFrameDecoder, and StringDecoder are Netty’s built-in codecs and decoders. LineBasedFrameDecoder is used to solve the problem of TCP sticky/unpack.
CustomerServerHandler:
@ChannelHandler.Sharable
class CustomerServerHandler(private val mListener: NettyServerListener<String>) : SimpleChannelInboundHandler<Any>() {
@Throws(Exception::class)
override fun channelReadComplete(ctx: ChannelHandlerContext){}override fun exceptionCaught(ctx: ChannelHandlerContext,
cause: Throwable) {
cause.printStackTrace()
ctx.close()
}
@Throws(Exception::class)
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
val buff = msg as ByteBuf
val info = buff.toString(CharsetUtil.UTF_8)
Log.d(TAG,"Received message:$info")}@Throws(Exception::class)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if (msg is WebSocketFrame) { // Process WebSocket messages
val webSocketInfo = (msg as TextWebSocketFrame).text().trim { it <= ' ' }
Log.d(TAG, "Received a WebSocketSocket message:$webSocketInfo")
mListener.onMessageResponseServer(webSocketInfo , ctx.channel().id().asShortText())
} else if (msg is String){ // Process Socket messages
Log.d(TAG, "Received a socket message:$msg")
mListener.onMessageResponseServer(msg, ctx.channel().id().asShortText())
}
}
// Disconnect the connection
@Throws(Exception::class)
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
Log.d(TAG, "channelInactive")
val reAddr = ctx.channel().remoteAddress() as InetSocketAddress
val clientIP = reAddr.address.hostAddress
val clientPort = reAddr.port
Log.d(TAG,"Disconnection:$clientIP : $clientPort")
mListener.onChannelDisConnect(ctx.channel())
}
companion object {
private val TAG = "CustomerServerHandler"}}Copy the code
2.2 Netty Clients
The client also needs a NettyTcpClient to start, close, and send messages, and NettyTcpClient is created using Builder mode.
class NettyTcpClient private constructor(val host: String, val tcp_port: Int.val index: Int) {
private lateinit var group: EventLoopGroup
private lateinit var listener: NettyClientListener<String>
private var channel: Channel? = null
/** * Get TCP connection status **@returnGet TCP connection status */
var connectStatus = false
/** * Maximum number of reconnections */
var maxConnectTimes = Integer.MAX_VALUE
private set
private var reconnectNum = maxConnectTimes
private var isNeedReconnect = true
var isConnecting = false
private set
var reconnectIntervalTime: Long = 5000
private set
/** * Heartbeat interval */
var heartBeatInterval: Long = 5
private set/ / unit of seconds
/** * Whether to send heartbeat */
var isSendheartBeat = false
private set
/** * Heartbeat data. The value can be String or byte[]. */
private var heartBeatData: Any? = null
fun connect(a) {
if (isConnecting) {
return
}
val clientThread = object : Thread("Netty-Client") {
override fun run(a) {
super.run()
isNeedReconnect = true
reconnectNum = maxConnectTimes
connectServer()
}
}
clientThread.start()
}
private fun connectServer(a) {
synchronized(this@NettyTcpClient) {
varchannelFuture: ChannelFuture? =null
if(! connectStatus) { isConnecting =true
group = NioEventLoopGroup()
val bootstrap = Bootstrap().group(group)
.option(ChannelOption.TCP_NODELAY, true)// Mask the Nagle algorithm attempt
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel::class.java as Class<out Channel>?
.handler(object : ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
public override fun initChannel(ch: SocketChannel) {
if (isSendheartBeat) {
ch.pipeline().addLast("ping", IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS)) // call userEventTriggered when no data is sent
}
ch.pipeline().addLast(StringEncoder(CharsetUtil.UTF_8))
ch.pipeline().addLast(StringDecoder(CharsetUtil.UTF_8))
ch.pipeline().addLast(LineBasedFrameDecoder(1024))// Sticky packet processing requires client and server cooperation
ch.pipeline().addLast(NettyClientHandler(listener, index, isSendheartBeat, heartBeatData))
}
})
try {
channelFuture = bootstrap.connect(host, tcp_port).addListener {
if (it.isSuccess) {
Log.d(TAG, "Connection successful")
reconnectNum = maxConnectTimes
connectStatus = truechannel = channelFuture? .channel() }else {
Log.d(TAG, "Connection failed")
connectStatus = false
}
isConnecting = false
}.sync()
// Wait until the connection is closed.
channelFuture.channel().closeFuture().sync()
Log.d(TAG, "Disconnect")}catch (e: Exception) {
e.printStackTrace()
} finally {
connectStatus = false
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, index)
if(channelFuture ! =null) {
if(channelFuture.channel() ! =null && channelFuture.channel().isOpen) {
channelFuture.channel().close()
}
}
group.shutdownGracefully()
reconnect()
}
}
}
}
fun disconnect(a) {
Log.d(TAG, "disconnect")
isNeedReconnect = false
group.shutdownGracefully()
}
fun reconnect(a) {
Log.d(TAG, "reconnect")
if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
reconnectNum--
SystemClock.sleep(reconnectIntervalTime)
if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
Log.e(TAG, "Reconnect")
connectServer()
}
}
}
/** * send ** asynchronously@paramData Indicates the data to be sent@paramListener Sends the result callback *@returnMethod result */
fun sendMsgToServer(data: String, listener: MessageStateListener)= channel? .run {val flag = this! =null && connectStatus
if (flag) {
this.writeAndFlush(data + System.getProperty("line.separator")).addListener { channelFuture -> listener.isSendSuccss(channelFuture.isSuccess) } } flag } ? :false
/** * synchronously sends **@paramData Indicates the data to be sent@returnMethod result */
fun sendMsgToServer(data: String)= channel? .run {val flag = this! =null && connectStatus
if (flag) {
val channelFuture = this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly()
return channelFuture.isSuccess
}
false}? :false
fun setListener(listener: NettyClientListener<String>) {
this.listener = listener
}
/** * Create NettyTcpClient */
class Builder {
/** * Maximum number of reconnections */
private var MAX_CONNECT_TIMES = Integer.MAX_VALUE
/**
* 重连间隔
*/
private var reconnectIntervalTime: Long = 5000
/** * Server address */
private var host: String? = null
/** * Server port */
private var tcp_port: Int = 0
/** * Client id, (because there may be multiple connections) */
private var mIndex: Int = 0
/** * Whether to send heartbeat */
private var isSendheartBeat: Boolean = false
/** * Heartbeat interval */
private var heartBeatInterval: Long = 5
/** * Heartbeat data. The value can be String or byte[]. */
private var heartBeatData: Any? = null
fun setMaxReconnectTimes(reConnectTimes: Int): Builder {
this.MAX_CONNECT_TIMES = reConnectTimes
return this
}
fun setReconnectIntervalTime(reconnectIntervalTime: Long): Builder {
this.reconnectIntervalTime = reconnectIntervalTime
return this
}
fun setHost(host: String): Builder {
this.host = host
return this
}
fun setTcpPort(tcp_port: Int): Builder {
this.tcp_port = tcp_port
return this
}
fun setIndex(mIndex: Int): Builder {
this.mIndex = mIndex
return this
}
fun setHeartBeatInterval(intervalTime: Long): Builder {
this.heartBeatInterval = intervalTime
return this
}
fun setSendheartBeat(isSendheartBeat: Boolean): Builder {
this.isSendheartBeat = isSendheartBeat
return this
}
fun setHeartBeatData(heartBeatData: Any): Builder {
this.heartBeatData = heartBeatData
return this
}
fun build(a): NettyTcpClient {
valnettyTcpClient = NettyTcpClient(host!! , tcp_port, mIndex) nettyTcpClient.maxConnectTimes =this.MAX_CONNECT_TIMES
nettyTcpClient.reconnectIntervalTime = this.reconnectIntervalTime
nettyTcpClient.heartBeatInterval = this.heartBeatInterval
nettyTcpClient.isSendheartBeat = this.isSendheartBeat
nettyTcpClient.heartBeatData = this.heartBeatData
return nettyTcpClient
}
}
companion object {
private val TAG = "NettyTcpClient"
private val CONNECT_TIMEOUT_MILLIS = 5000}}Copy the code
The Android client is relatively simple, requiring handlers that include: Support IdleStateHandler for heartbeat, the Handler used for TCP messages (StringEncoder, StringDecoder, LineBasedFrameDecoder), And NettyClientHandler for processing incoming TCP messages.
NettyClientHandler:
class NettyClientHandler(private val listener: NettyClientListener<String>, private val index: Int.private val isSendheartBeat: Boolean.private val heartBeatData: Any?) : SimpleChannelInboundHandler<String>() {
/** ** sets IdleStateHandler heartbeat detection to perform a read detection every x seconds. * The userEventTrigger() method is triggered if the ChannelRead() method is not called within x seconds@param ctx ChannelHandlerContext
* @param evt IdleStateEvent
*/
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
if (evt.state() == IdleState.WRITER_IDLE) { // Send heartbeat
if (isSendheartBeat) {
if (heartBeatData == null) {
ctx.channel().writeAndFlush("Heartbeat" + System.getProperty("line.separator")!! }else {
if (heartBeatData is String) {
Log.d(TAG, "userEventTriggered: String")
ctx.channel().writeAndFlush(heartBeatData + System.getProperty("line.separator")!! }else if (heartBeatData is ByteArray) {
Log.d(TAG, "userEventTriggered: byte")
val buf = Unpooled.copiedBuffer((heartBeatData asByteArray?) !!) ctx.channel().writeAndFlush(buf) }else {
Log.d(TAG, "userEventTriggered: heartBeatData type error")}}}else {
Log.d(TAG, "No heartbeat.")}}}}/** ** Client online **@param ctx ChannelHandlerContext
*/
override fun channelActive(ctx: ChannelHandlerContext) {
Log.d(TAG, "channelActive")
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, index)
}
/** ** Client offline **@param ctx ChannelHandlerContext
*/
override fun channelInactive(ctx: ChannelHandlerContext) {
Log.d(TAG, "channelInactive")}/** * The client received the message **@param channelHandlerContext ChannelHandlerContext
* @paramMSG news * /
override fun channelRead0(channelHandlerContext: ChannelHandlerContext, msg: String) {
Log.d(TAG, "channelRead0:")
listener.onMessageResponseClient(msg, index)
}
/ * * *@param ctx ChannelHandlerContext
* @paramCause abnormal * /
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
Log.e(TAG, "exceptionCaught")
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, index)
cause.printStackTrace()
ctx.close()
}
companion object {
private val TAG = "NettyClientHandler"}}Copy the code
Three. Implementation of Demo
3.1 Socket Server
Start the NettyServer:
private fun startServer(a) {
if(! NettyServer.isServerStart) { NettyServer.setListener(this@MainActivity)
NettyServer.port = port
NettyServer.webSocketPath = webSocketPath
NettyServer.start()
} else {
NettyServer.disconnect()
}
}
Copy the code
NettyServer sends TCP messages asynchronously:
NettyServer.sendMsgToClient(msg, ChannelFutureListener { channelFuture ->
if (channelFuture.isSuccess) {
msgSend(msg)
}
})
Copy the code
NettyServer asynchronously sends WebSocket messages:
NettyServer.sendMsgToWS(msg, ChannelFutureListener { channelFuture ->
if (channelFuture.isSuccess) {
msgSend(msg)
}
})
Copy the code
Demo can use startServer to start the Socket server, or click configServer to change the port of the server and the Endpoint of WebSocket before starting.
3.2 Socket Client
NettyTcpClient is created in Builder mode:
mNettyTcpClient = NettyTcpClient.Builder()
.setHost(ip) // Set the server address
.setTcpPort(port) // Set the server port number
.setMaxReconnectTimes(5) // Set the maximum number of reconnections
.setReconnectIntervalTime(5) // Set the reconnection interval. Unit: second
.setSendheartBeat(false) // Set to send heartbeat
.setHeartBeatInterval(5) // Set the heartbeat interval. Unit: second
.setHeartBeatData("I'm is HeartBeatData") // Set heartbeat data. The value can be String or byte[], whichever is later
.setIndex(0) // Set the client identity (because there may be multiple TCP connections)
.build()
mNettyTcpClient.setListener(this@MainActivity) // Set TCP listening
Copy the code
Start and close client connection:
private fun connect(a) {
Log.d(TAG, "connect")
if(! mNettyTcpClient.connectStatus) { mNettyTcpClient.connect()// Connect to the server
} else {
mNettyTcpClient.disconnect()
}
}
Copy the code
NettyTcpClient asynchronously sends TCP messages to the server:
mNettyTcpClient.sendMsgToServer(msg, object : MessageStateListener {
override fun isSendSuccss(isSuccess: Boolean) {
if (isSuccess) {
msgSend(msg)
}
}
})
Copy the code
The Demo client App can also click configClient to change the server IP address and port to be connected before startup.
WebSocket tests are available at: www.websocket-test.com/
Netty Server communicates with web pages:
WebSocket Online test:
4. To summarize
With the help of Kotlin’s features and Netty framework, we also implemented a Socket server on Android.
Demo github address: github.com/fengzhizi71…
The example in this article is simple, just sending a simple message. In a real production environment, the message format would probably be JSON, because JSON is more flexible, parsing JSON to get the content of the message.
References:
- Github.com/aLittleGree…
- Netty implements a port that receives both socket and webSocket connections
Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.