Integer.max_value minutes are required to read this article.

The story background

The main business of the company I work for is smart home, and I am responsible for the development of Android App in the company. About intelligent household, estimate now 80 percent of children’s shoes have been heard, but truly understand or used estimate does not occupy the majority. This article is not about industry prospects, but technology.

In order to facilitate us to better understand the background of the story, by the way popular science smart home, want to go straight to the WebSocket theme of children’s shoes can be directly chapter 2.

Smart home is a typical application scenario of the Internet of Things. What is the Internet of Things? It literally means the Internet of things (IoT), a network of connected objects. From your mobile phone and Bluetooth headset to every corner of a city, the ultimate form is “the Internet of everything”. (Why do you suddenly think of ten thousand Buddhas?)

IoT doesn’t really care about the protocol of the network or what it is connected to. In this form, it is IoT. In fact, this concept has been around for a long time. When I was in college, I heard the word “Internet of Things” from 2009 to 2010, not afraid to reveal my age. Remember the GPRS intelligent meter reading system of the experimental class yao, you may not feel how tall, is not the water meter inserted a SIM card, regularly send the value to the client, no longer afraid of being visited the water meter ~ yes, this is also an embodiment of IoT.

But over the years, the Internet of Things has been tepid. As to why it was put forward again in recent years, intelligent household played a very important role among them. In addition, AI also played a role in the fire, any product to add intelligence two words. So we have AI + IoT, or AIOT. I won’t go into details, but I can write a separate article on the Internet of Things.

Back to smart home, smart home is the Internet of things in the home. To put it simply, you can control the lights, sockets, monitoring, electrical appliances and so on through the home central control or App.

The text is a ball, a thousand worries diagram, frame composition

Easy to explain, the function of the gateway is responsible for a device that connects all the room, between the gateway and equipment generally don’t use HTTP (excluding individual item), and will use the near field communication protocols such as such as zigbee, lora (or direct cable way more stable), because of low power consumption, save electricity, if you want to home with dozens of switch panel, It can save ten or twenty dollars of electricity every month. Of course, the most important reason is that the scheme is also relatively mature. Therefore, before using these devices, you need to perform a networking operation to group the devices to the gateway. If disconnected, the gateway considers the device offline.

The gateway reports the device status to the cloud platform, and the cloud platform sends the status to the client. Again, client-side control of the device is a reverse process. Of course, if 5G is popularized in the future, the equipment can directly connect to the cloud platform through 5G network, because 5G has low power consumption, low latency and fast speed, there is no need for gateway.

The overall structure is relatively simple, of course, there are also a lot of complex logic in the middle, involving such as equipment status, personnel, authority, house management, etc., which is not concerned here.

Push is involved when the server delivers device status. Because smart home has the concept of situational mode, such as going home mode, performing a request and clicking on a dozen lights, it is not realistic to judge the status by the return value of the request.

Due to the particularity of the business, the push of smart home needs to have high real-time performance. For example, when the user turns on the light (whether on or directly on the APP), the status of the light on the APP needs to be immediately turned on. If you wait three or five seconds for the state to change, the user experience is bad.

We used some light push at the earliest, and found that the delay was serious. Sometimes we had to wait for more than ten seconds to receive the push. After all, the applicable scenarios were different. So I decided to build my own long connection when pushing. After selection, decided to use the protagonist of this paper, WebSocket.

Existing problems with WebSocket state management

For an introduction to the WebSocket protocol, watch WebSocket Protocol: 5 minutes from Getting Started to Mastering. There are many ready-made WebSocket connection libraries on the market, notably Java-WebSocket, and OkHttp also comes with WebSocket support.

Initially, OkHttp was used directly because it was already plugged into the project. The way to use it is very simple and should be obvious to anyone familiar with OkHttp,

OkHttpClient client = OkHttpClient.Builder().build(); Request request = new Request.Builder().build(); client.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(okhttp3.WebSocket webSocket, Response response) {} @Override public void onMessage(okhttp3.WebSocket webSocket, String text) {} @Override public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {} @Override public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {} }); Copy the codeCopy the code

It is easy to call and the callback status is clear. Java-websocket is similar, but generally has the following problems:

1. heartbeat

Both OkHttp and Java-WebSocket have a heartbeat mechanism. However, OkHttp’s heartbeat interval, known as pingInterval, is fixed at the time of client creation and cannot be adjusted mid-stream. (Jake also responded to the question on Github, meaning callers don’t need to pay attention to it. But there is often a need 😢😢😢)

For example, when the application is in the foreground, the heartbeat may be slightly more frequent, but when the application is in the background, the heartbeat interval can be set slightly longer for power-saving optimization. If you need to adjust the heartbeat, you need to create a new client, disconnect the old client, and reconnect the client. There are two strategies:

  • First cut off the old, then connect to the new. If the server does not cache messages, messages may be missed between a break and a new connection.
  • First connect the new, then disconnect the old. This requires that two connections be made to the server at once, and the server may have an exception to the state of the message.

Both of the above two strategies need to be agreed between the client and the server. The server does extra processing, which increases the r&d cost and error probability.

2. Broken reconnection

Usually the connection is broken in the following cases:

  • Client active disconnection (no need to reconnect)
  • Server Active disconnection (no need to reconnect)
  • Client due to network reasons, such as mobile phone disconnection, inactive segment (need to reconnect)
  • Carrier disconnection due to idle connection (reconnection required)

For the last case, please refer to the mobile IM Practice: Implementing the Smart Heartbeat mechanism of wechat on Android

For reconnection, the time interval for reconnection should also be considered. If the mobile phone network is broken, it will also fail to reconnect continuously, so you need to formulate a relationship between the number of reconnection and the time interval.

In short, the logic and strategy of reconnection is something we need to maintain ourselves.

3. multithreading

The callback to the state in OkHttp occurs in the child thread created by OkHttp, and we need to initiate a reconnection based on the state. If our application switches between the front and back and we have to create new connections, it’s a real mess. Questions to consider include:

  • If multiple connection requests are initiated at the same time, you need to wait for the completion of one connection request and then initiate the next connection request. If the previous connection is connected, the next waiting connection request can be skipped.
  • Connection requests occur in different threads, and the state is synchronized

The solution

Since I used OkHttp in my project, the initial solutions were designed for OkHttp. The general idea is as follows:

1. heartbeat

1.1 OkHttp WebSocket Heartbeat Analysis

OkHttp does not support dynamic heartbeat adjustment. OkHttp does not support dynamic heartbeat adjustment.

To analyze the source code, we start by calling the entry client.newWebSocket:

*/ @override public WebSocket newWebSocket(Request) Request, WebSocketListener listener) {// Notice the pingInterval, so it was fixed when the RealWebSocket was created and cannot be changed. RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval); webSocket.connect(this); return webSocket; [email protected] public RealWebSocket(Request Request, WebSocketListener Listener, Random random, long pingIntervalMillis) { this.originalRequest = request; this.listener = listener; this.random = random; this.pingIntervalMillis = pingIntervalMillis; / /... // The writerRunnable is used to send messages to the server, WriterRunnable = new Runnable() {@override public void run() {try {while (writeOneFrame()) {}} catch (IOException e) { failWebSocket(e, null); }}}; } public void connect(OkHttpClient client) {// WebSocket protocol related header client = client.newBuilder() .eventListener(EventListener.NONE) .protocols(ONLY_HTTP1) .build(); final Request request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .build(); call = Internal.instance.newWebSocketCall(client, request); call.enqueue(new Callback() { @Override public void onResponse(Call call, Response response) { try { checkResponse(response); } catch (ProtocolException e) { failWebSocket(e, response); closeQuietly(response); return; } // Promote the HTTP streams into web socket streams. StreamAllocation streamAllocation = Internal.instance.streamAllocation(call); streamAllocation.noNewStreams(); // Prevent connection pooling! Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation); // Process all Web socket messages. try {// Connect successfully, call listern onOpen listener.onOpen(RealWebSocket. This, response); String name = "OkHttp WebSocket " + request.url().redact(); InitReaderAndWriter (name, streams); streamAllocation.connection().socket().setSoTimeout(0); // focus 2: poll read loopReader(); } catch (Exception e) { failWebSocket(e, null); } } @Override public void onFailure(Call call, IOException e) { failWebSocket(e, null); }}); } public void initReaderAndWriter(String name, Streams streams) throws IOException { synchronized (this) { this.streams = streams; this.writer = new WebSocketWriter(streams.client, streams.sink, random); / / create a Scheduled thread pool enclosing executor = new ScheduledThreadPoolExecutor (1, Util. ThreadFactory (name, false)); if (pingIntervalMillis ! = 0) {// Run PingRunnable periodically with the thread pool, More and more close to true as executor. ScheduleAtFixedRate (new PingRunnable (), pingIntervalMillis, pingIntervalMillis, MILLISECONDS); } if (! messageAndCloseQueue.isEmpty()) { runWriter(); }} // Send messages that were enqueued before we were connected. Reader = new WebSocketReader(stream.client, stream.source, this); } Duplicate codeCopy the code

What does PingRunnable do

Private final class PingRunnable implements Runnable {PingRunnable() {} @override public void run() {// Ping frame writePingFrame(); } } void writePingFrame() { WebSocketWriter writer; int failedPing; synchronized (this) { if (failed) return; writer = this.writer; // Set failedPing to sentPingCount and failedPing to -1 failedPing = awaitingPong? sentPingCount : -1; sentPingCount++; awaitingPong = true; } if (failedPing ! = -1) {// The last pong has not returned, FailWebSocket (new SocketTimeoutException("sent ping but didn't receive pong within "+ pingIntervalMillis + "ms (after " + (failedPing - 1) + " successful ping/pongs)"), null); return; } try {// Send a ping message writer.writeping (byteString.empty); } catch (IOException e) { failWebSocket(e, null); }} Copy the codeCopy the code

So how do you know if you received Pong? You need to use the Reader. So let’s see what’s going on in reader.

Remember the loopReader marked in point 2?

// [email protected] /** Receive frames until there are no more. Invoked only by the reader thread. */ public void loopReader() throws IOException { while (receivedCloseCode == -1) { // This method call results in one or ProcessNextFrame (); more onRead* methods being called on this thread. Void processNextFrame() throws IOException {readHeader(); void processNextFrame() throws IOException {readHeader(); if (isControlFrame) { readControlFrame(); } else { readMessageFrame(); Opcode */ private void readControlFrame() throws IOException {//... switch (opcode) { case OPCODE_CONTROL_PING: frameCallback.onReadPing(controlFrameBuffer.readByteString()); break; Case OPCODE_CONTROL_PONG: / / note: received pong, trigger onReadPing frameCallback. OnReadPong (controlFrameBuffer. ReadByteString ()); break; case OPCODE_CONTROL_CLOSE: // ... frameCallback.onReadClose(code, reason); closed = true; break; default: throw new ProtocolException("Unknown control opcode: " + toHexString(opcode)); }} /** * counter increment, Public synchronized void onReadPong(ByteString buffer) pings. receivedPongCount++; awaitingPong = false; } Duplicate codeCopy the code

After catching up, I found that after receiving Pong, OKhttp simply changed the status of the awaitingPong marker, which meant that the last pong message was received.

To summarize, okHTTP’s heartbeat flow,

  1. Create a schedule thread pool when ReadWebSocket connects, periodically execute PingRunnable to ping messages; (Send additional request messages in WriteRunnable)
  2. Create a reader to receive messages continuously;
  3. Before sending the ping, check whether the previous pong has returned through the awaitingPong tag.
  4. Parse the received message. If the pong is received, change the awaitingPong state.

1.2 Dynamically adjust okHTTP heartbeat

We found that it was possible to dynamically change the ping rate without recreating the OkHttpClient by simply changing the thread pool to execute The PingRunable delayTime.

So you can use reflection to shutDown the old thread pool, create a new one, and perform pingRunnable with a new pingInterval.

Class clazz = Class.forName("okhttp3.internal.ws.RealWebSocket"); Field field = clazz.getDeclaredField("executor"); field.setAccessible(true); ScheduledExecutorService oldService = (ScheduledExecutorService) field.get(mWebSocket); Class[] innerClasses = Class.forName("okhttp3.internal.ws.RealWebSocket").getDeclaredClasses(); for (Class innerClass : InnerClasses) {if ("PingRunnable".equals(innerclass.getSimplename ())) {// Create a new PingRunnable instance Constructor =  innerClass.getDeclaredConstructor(RealWebSocket.class); constructor.setAccessible(true); Object pingRunnable = constructor.newInstance(mWebSocket); / / create a new thread pool ScheduledThreadPoolExecutor newService = new ScheduledThreadPoolExecutor (1, Util. ThreadFactory (" ws - ping ", false)); newService.scheduleAtFixedRate((Runnable) pingRunnable, interval, interval, unit); field.set(mWebSocket, newService); // shutdown the old thread pool oldservice.shutdown (); }} Copy the codeCopy the code

In this way, we achieve the purpose of dynamic adjustment of the heartbeat.

The ping of java-websocket is sent by calling sendPing() externally. There is no ping/pong state mechanism internally, so we need to maintain this relationship ourselves. Like OkHttp, you can use timed messages to send pings and then parse Pong to maintain heartbeat status. I won’t elaborate on that here.

2. Broken reconnection

As mentioned earlier, disconnecting and reconnecting simply deals with the logic of several disconnected states. Fortunately, most Websocket libraries have close and error callbacks, which can distinguish between abnormal disconnection and active disconnection.

What needs to be maintained is the relationship between the reconnection interval and retry times, similar to the exponential backoff algorithm.

3. multithreading

In view of the above mentioned multithreading problem, the main problem is that when multiple connection requests occur at the same time, it is better to parallel the serial than to add various synchronization to multithreading. In a message queue-like fashion, message processing is carried out in a separate thread. This saves thread synchronization and keeps state order.

In fact, in Android, HandlerThread can perfectly meet the above two points, and the author did use HandlerThread to do it at first. Later, I realized that it could be platform-independent and run on a pure Java platform, so I implemented it manually. Maintaining your own message queues allows you to better handle priorities, latency, and so on.

WebSocketGo

As mentioned earlier, my project is based on OkHttp, but I found that the state management section could be abstracted separately, hence the title of this article, WebSocketGo (WsGo).

Welcome to Star, github.com/Gnepux/WebS…

The data flow is as follows:

  1. The Dispatcher is the message queue mentioned above, which is the producer-consumer mode. It maintains two queues, sends commands and receives events.

    • The commands include CONNECT, DISCONNECT, RECONNECT, CHANGE_PING, and SEND

    • Events include OnConnect, onMessage, onSend, onRetry, onDisConnect, and onClose

  2. The Channel Manager is responsible for state management.

  3. The WebSocket Interface can be understood as an adaptation layer that calls the WebSocket library.

  4. The Event Listener is the foreground state callback.

Access WsGo

Implementation 'com.gnepux:wsgo: 1.0.1' // use okhttp implementation 'com.gnepux:wsgo- okwebSocket :1.0.1' // use Java Websocket implementation 'com.gnepux:wsgo- jwebSocket :1.0.1' copies the codeCopy the code

Initialize the

WsConfig config = new WsConfig.Builder() .debugMode(true) // true to print log .setUrl(pushUrl) // ws url .setHttpHeaders(headerMap) // http headers .setConnectTimeout(10 * 1000L) // connect timeout .setReadTimeout(10 * 1000L)  // read timeout .setWriteTimeout(10 * 1000L) // write timeout .setPingInterval(10 * 1000L) // initial ping interval .setWebSocket(OkWebSocket.create()) // websocket client .setRetryStrategy(retryStrategy) // retry count and delay time strategy .setEventListener(eventListener) // event listener .build(); WsGo.init(config); Copy the codeCopy the code

Go

/ / connect WsGo. GetInstance (). The connect (); Wsgo.getinstance ().send("hello from WsGo"); // Disconnect wsgo.getInstance ().disconnect(1000, "close"); WsGo.getInstance().disconnectNormal("close"); // Change the heartbeat wsgo.getInstance ().changepingInterval (10, timeUnit.seconds); / / release WsGo. GetInstance (). DestroyInstance (); Copy the codeCopy the code

More configuration for WsConfig

setWebSocket(WebSocket socket)

WsGo already supports OkHttp and Java WebSocket

// for OkHttp (wsgo-okwebsocket) setWebSocket(OkWebSocket.create()); // for Java WebSocket (wsgo-jwebsocket) setWebSocket(JWebSocket.create()); Copy the codeCopy the code

If you need to use another WebSocket library or custom client, just implement a WebSocket interface and pass the corresponding result to ChannelCallback. WsGo will do the rest of the connection management for you.

public interface WebSocket { void connect(WsConfig config, ChannelCallback callback); void reconnect(WsConfig config, ChannelCallback callback); boolean disconnect(int code, String reason); void changePingInterval(long interval, TimeUnit unit); boolean send(String msg); } Duplicate codeCopy the code

setRetryStrategy(RetryStrategy retryStrategy)

For abnormal disconnections, the WsGo automatically reconnects. RetryStrategy refers to the relationship between reconnection times and latency.

WsGo has a DefaultRetryStrategy by default. If you need to adjust it yourself, implement the onRetry method in the RetryStrategy interface.

Public interface RetryStrategy {/** * Relationship between retry times and delay ** @param retryCount Number of retries * @return Delay time */ long onRetry(long retryCount); } Duplicate codeCopy the code

setEventListener(EventListener eventListener)

Add event callbacks. Note that the callback runs in a thread created by the WsGo itself, not in the calling thread. If necessary, the manual switch thread needs to be called at the meeting.

public interface EventListener { void onConnect(); void onDisConnect(Throwable throwable); void onClose(int code, String reason); void onMessage(String text); void onReconnect(long retryCount, long delayMillSec); void onSend(String text, boolean success); } Duplicate codeCopy the code

PS

WsGo is not currently associated with notification of changes in the mobile network (i.e. disconnection can be called back and network recovery will not be automatically reconnected), because I think this part does not belong to the state management of WebSocket itself, and it is no longer platform independent. If you have this convenience requirement, you need to initiate the connection from outside. Simply encapsulate it in your project

Afterword.

This article is a bit longer, starting from the initial smart home scenario, introduces the existing problems of WebSocket state management, then gives the author’s solutions, and finally the general state management framework WebSocketGo. Is also the author in the work of a little thinking and summary. In fact, not only Websockets, all long connections should face the same problem. But the Angle of thinking and the starting point of solution should be the same.