Welcome to reprint, reprint please indicate the source: juejin.cn/post/686893…

Writing in the front

Post a Kula HD map town building:

In the previous two articles, I worked with me to develop commercial-level IM (1) — technology selection and protocol definition and I worked with me to develop commercial-level IM (2) — Interface definition and encapsulation. We have already understood the technical selection and interface definition and encapsulation of IMS. Next, we will actually realize the connection and reconnection part.

For a social product, the stability of long connection is the prerequisite, and the normal operation of most business logic requires the support of stable long connection, which is the top priority. This article will explain how to implement and maintain a stable long connection, as well as the handling of various exceptions. By the end of this article, you will have learned about connections, reconnection mechanisms, heartbeat mechanisms, and more. We will also open source the code on Github (including Android client /Java server, TCP/WebSocket), so without further ado, let’s get started.

Initial Configuration

Initial configuration, that is, when the application starts and the IMS initializes, the required configuration parameters are entered. You can customize the parameters based on your service requirements. NettyWebSocketIMS code is the same as NettyWebSocketIMS code. NettyWebSocketIMS code is the same as NettyTCPIMS code. The following will only explain the difference between WebSocket and TCP implementation, if you need to complete the code can jump to Github view) :

/** * Initialize * @param context * @param Options IMS initialization * @param connectStatusListener IMS connection status listener * @param MsgReceivedListener IMS message receiving listener * @return */ @Override public Boolean init(Context Context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {if (context == null) {log. d(TAG, "initialization failed: context is null."); initialized = false; return false; } if (options == null) {log. d(TAG, "initialization failed: IMSOptions is null."); initialized = false; return false; } this.mContext = context; this.mIMSOptions = options; this.mIMSConnectStatusListener = connectStatusListener; this.mIMSMsgReceivedListener = msgReceivedListener; executors = new ExecutorServiceFactory(); / / initialize the reconnection thread pool executors. InitBossLoopGroup (); / / registered network connection state monitor NetworkManager. GetInstance () registerObserver (context, this); Initialized = true; initialized = true; IsClosed = false; callbackIMSConnectStatus(IMSConnectStatus.Unconnected); return true; }Copy the code

As shown above, a few steps of initialization are briefly described:

  1. parameter
  • Context The IMS obtains system resources and performs system operations.
  • Options Specifies the parameters required for IMS initialization, including the communication implementation mode, communication protocol, transport protocol, connection timeout period, reconnection delay, reconnection count, heartbeat interval, and server address.
  • ConnectStatusListener Indicates the IMS connection status callback for reporting the connection status to the application layer.
  • MsgReceivedListener is a message receiving callback for IMS to call back the received message to the application layer. (This article focuses on connection and reconnection, so I won’t cover the message part, which will be covered later.)
  1. Thread pool groups are divided into boss thread pool and work thread pool. The boss thread pool is responsible for connection and reconnection. The Work thread pool, which is responsible for the heartbeat portion, is a single-threaded thread pool (because only one thread can connect or heartbeat at a time). The reason for using a thread pool is a personal habit, but you can also use a child thread to implement it.
  2. When registering the network status to monitor network changes, the IMS is reconnected.

The initial Bootstrap

To initialize Bootstrap, refer to Netty ChannelOption and customize it according to actual service scenarios. The customized configuration is posted below:

/** * initialize bootstrap */ void initBootstrap() {closeBootstrap(); // Close NioEventLoopGroup before initialization loopGroup = new NioEventLoopGroup(4); bootstrap = new Bootstrap(); Bootstrap.group (loopGroup).channel(niosocketchannel.class) // If there is no data communication within 2 hours after setting this option, Option (channeloption. SO_KEEPALIVE, true) // Disable the Nagle algorithm. If high real-time performance is required, the nagle algorithm is sent immediately when there is data to be sent. If you want to reduce the number of times to be sent and network interaction, set this parameter to false. The default value is false. Option (channeloption.tcp_nodelay, true) // Set the size of the TCP send buffer (bytes). Option (channeloption.so_sndbuf, Option (channeloption.so_rcvbuf, 32 x 1024) // Set the connection timeout period, in units: Milliseconds. Option (ChannelOption CONNECT_TIMEOUT_MILLIS, MIMSOptions. GetConnectTimeout ()) / / set the initialization ChannelHandler handler (new NettyTCPChannelInitializerHandler (this)); }Copy the code

For the meanings of parameters, please refer to the official documentation.

The connection

A connection can also be considered a reconnection, and the reconnection response logic can be performed:

@override public void connect() {if(! Initialized) {log. w(TAG, "IMS initialization failed, please view Log "); return; } isExecConnect = true; // this. Reconnect (true); }Copy the code

Therefore, we directly focus on the connecting part, which is also the most core and complex part of the whole article.

reconnection

In order to make NettyTCPIMS code as concise and logical as possible, the connection and reconnection part of the code is extracted to NettyTCPReconnectTask:

public class NettyTCPReconnectTask implements Runnable { private static final String TAG = NettyTCPReconnectTask.class.getSimpleName(); private NettyTCPIMS ims; private IMSOptions mIMSOptions; NettyTCPReconnectTask(NettyTCPIMS ims) { this.ims = ims; this.mIMSOptions = ims.getIMSOptions(); } @override public void run() {try {// Release the working thread group after reconnection, i.e., stop ims.getexecTrue ().DestroyworkLoopGroup (); // connect while (!) when IMS is not down and network is available. ims.isClosed() && ims.isNetworkAvailable()) { IMSConnectStatus status; if ((status = connect()) == IMSConnectStatus.Connected) { ims.callbackIMSConnectStatus(status); break; // Successful connection, Jump out of the loop} the if (status = = IMSConnectStatus ConnectFailed | | status = = IMSConnectStatus. ConnectFailed_IMSClosed | | status = = IMSConnectStatus.ConnectFailed_ServerListEmpty || status == IMSConnectStatus.ConnectFailed_ServerEmpty || status == IMSConnectStatus.ConnectFailed_ServerIllegitimate || status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) { ims.callbackIMSConnectStatus(status); if(ims.isClosed() || ! ims.isNetworkAvailable()) { return; } log. w(TAG, string. format(" Failed to connect in one period, Try again after waiting for % 1 $DMS reconnection ", mIMSOptions. GetReconnectInterval () * 2)); try { Thread.sleep(mIMSOptions.getReconnectInterval() * 2); } catch (InterruptedException e) { e.printStackTrace(); }}}} finally {// ims.setreconnecting (false); } /** * Connect server * @return */ private IMSConnectStatus connect() {if (ims.isclosed ()) return IMSConnectStatus.ConnectFailed_IMSClosed; ims.initBootstrap(); List<String> serverList = mIMSOptions.getServerList(); if (serverList == null || serverList.isEmpty()) { return IMSConnectStatus.ConnectFailed_ServerListEmpty; } for (int i = 0; i < serverList.size(); i++) { String server = serverList.get(i); if (StringUtil.isNullOrEmpty(server)) { return IMSConnectStatus.ConnectFailed_ServerEmpty; } String[] params = null; try { params = server.split(" "); } catch (Exception e) { e.printStackTrace(); } if (params == null || params.length < 2) { return IMSConnectStatus.ConnectFailed_ServerIllegitimate; } if(i == 0) { ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting); } for (int j = 0; int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) { if (ims.isClosed()) { return IMSConnectStatus.ConnectFailed_IMSClosed; } if (! ims.isNetworkAvailable()) { return IMSConnectStatus.ConnectFailed_NetworkUnavailable; } log.d (TAG, string. format(" connecting for %2$d ", server, j + 1)); try { String host = params[0]; int port = Integer.parseInt(params[1]); Channel channel = toServer(host, port); if (channel ! = null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) { ims.setChannel(channel); return IMSConnectStatus.Connected; } else {the if (j = = mIMSOptions getReconnectCount ()) {/ / if the current has reached the maximum number of reconnection, and is the last one server address, If (I == serverlist.size () -1) {log.w (TAG, string. format("【%1$s】 failed to connect ", server)); return IMSConnectStatus.ConnectFailed; } else {// If a server fails to connect to another server address, wait for a while before trying to connect to another server address. Log. W (TAG, string. format(" [%1$s]]) failed to connect to the next server, waiting to reconnect to the next server. %2$dms", server, mIMSOptions.getReconnectInterval())); Log.w(TAG, "========================================================================================="); Thread.sleep(mIMSOptions.getReconnectInterval()); }} else {// Connection failed, The thread to sleep (reconnection interval length / 2 * n) ms int delayTime = mIMSOptions. GetReconnectInterval () + mIMSOptions. GetReconnectInterval () / 2 * j; Log.w(TAG, string. format("【%1$s】 failed to connect, waiting for reconnection, current reconnection delay: %2$DMS ", server, delayTime)); Thread.sleep(delayTime); } } } catch (InterruptedException e) { break; / / thread is interrupted, the forced closure}}} return IMSConnectStatus. ConnectFailed; } @param host @param port @return */ private Channel toServer(String host, int port) { Channel channel; try { channel = ims.getBootstrap().connect(host, port).sync().channel(); } catch (Exception e) { e.printStackTrace(); channel = null; } return channel; }}Copy the code

From the above code, you can see that there are three main methods:

  • The run() reconnection task is a Thread, and the run() method is executed when the Thread is started. It is mainly used to determine whether IMS is closed and the network status. If these two conditions are met, the connection will be repeated. The maximum number of reconnections is reached from the beginning of the connection to all server addresses), delay a period of time and then try to reconnect (we may ask why to delay, direct connection is not good? The main reason is that if the connection fails, in most cases, the network environment of the client is not good or there is a problem on the server. The delay is to restore the network at the next time node to avoid frequent connection and save performance.) Until the connection is successful.

  • toServer()

ToServer () is mainly Netty framework for TCP long connection code, relatively simple.

  • connect()

All the logic for connecting and reconnecting is put into the connect() method. There are subtle differences between TCP and WebSocket. The following uses TCP as an example. The differences between WebSocket will be listed later. Note: The format of the fixed TCP server address in ims_kulaSDK is IP address port number, for example, 192.168.0.1 8808. You can also define the format according to your own requirements.

The connect() method looks like this:

  • Check whether the IMS is disabled or the network is unavailable. If either of the two conditions is met, the connection fails.
  • Check whether the server address list is set. If not, the connection fails.
  • If the preceding conditions are not met, that is, the IMS is not closed, the network is available, and the server address is set, Bootstrap is initialized.
  • Then we need two for loops. The outer loop iterates through the list of server addresses, fetching each one. The inner loop is responsible for traversing the maximum number of reconnections set by the user, which is 3 by default, plus the one needed to connect, which means that without setting the maximum number of reconnections,ims_kulaThe SDK does this for each server address4Once the connection. At the same time, the reconnection interval is set to reconnectInterval + reconnectInterval / 2 * N. That is, if the reconnection interval is set to 8000ms, the second reconnection interval is set to 12000ms, and the third reconnection interval is set to 16000ms, and so on.
  • After the server address is obtained, the address is divided into strings to obtain host and port respectively.
  • Then call Netty to connect to TCP (toServer(String host, int port)).

Note: The connection mode of WebSocket is similar to that of TCP. The only difference is that the server address format of WebSocket is different from that of TCP. Ws ://IP address: port number/WebSocket. Ws: / / 192.168.0.1:8808 / websocket, so the websocket for host and port code is as follows: (pseudo code, specific code is visible NettyWebSocketReconnectTask)

URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();
Copy the code

As for the rest of the connection and reconnection part of the code, WebSocket and TCP are consistent, because WebSocket itself is based on TCP protocol for a layer of encapsulation.

When to reconnect and disconnect

To make it clear, reconnection is relative to the client. There is no active connection on the server. Disconnecting is relative to the server, strictly speaking, removing the Channel of the response.

Client reconnection timing:

  • Network switch
  • Disconnect from the Network
  • An observable server exception
  • Heartbeat timeout, etc.

When the server is disconnected:

  • Perceptible client exception
  • The heartbeat timeout
  • Repeat connections between clients with the same IP address

I don’t know if you have noticed, but there is a perceptible exception in both client reconnection and server disconnection times. What is a perceptible anomaly? That is, both the client and the server can sense the disconnection of the peer.

After the test, Netty will call back the client if the server stops the service manually when the connection is established successfullyexceptionCaught()The methods are as follows:



When the server shuts down or pulls out a network cable, the client cannot detect the connection and needs to use the heartbeat timeout mechanism to reconnect the server.

Similarly, for the server, if the client kills the process manually, Netty will call backchannelInactive()The methods are as follows:



When the client shuts down or disconnects from the network, the server cannot detect it. The heartbeat mechanism is also used to disconnect the client (channel removal).

Note: Reconnection and disconnection using the heartbeat timeout mechanism will be covered in a future article. This article focuses on reconnection and reconnection and will not be covered here.

Results show

Considering the large size of GIF images, the connection timeout time and reconnection interval time are temporarily shortened. The client connection changes under several circumstances are shown below:

  • Normal connection

  • The client disconnects and reconnects

  • The server stops service reconnection

Note: in the GIF above, when the client successfully establishes a connection with the server, the word “message” will be displayed; otherwise, the connection status will be displayed.

The client logs are as follows:

The server logs are as follows:

Since the client kills the process and the server actively stops the service, the log will be cleared, so more detailed log will not be posted. Interested students can pull the code to verify themselves.

Write in the last

Through the above code, if the long connection stability is not considered (the heartbeat timeout reconnection logic is not added), the client and server can receive and receive messages. This article mainly explains the connection and reconnection module, so the message receiving and receiving function is not added.

In the next article, I’ll look at TCP/WebSocket unpacking and sticky packet handling. Since Netty already encapsulates a variety of message codecs, using the message format I defined makes unpacking and sticky packet handling very simple. Considering that you may have different business protocol requirements, so will be added to the message codec implementation of the custom protocol, stay tuned.

Relevant code has been submitted to Github, you need to get it by yourself:

  • KulaChat
  • kulachat-server
  • ims_kula

PS: The newly opened public account can not leave a message, if you have different opinions or suggestions, you can go to the nuggets comment or add to the QQ group: 1015178804, if the group is full, you can also give me a private message on the public account, thank you.

Post the official number:

FreddyChen

See you next article, Goodby