Integer.max_value minutes are required to read this article.
The story background
The main business of the company I work for is smart home, and I am responsible for the development of Android App in the company. About intelligent household, estimate now 80 percent of children’s shoes have been heard, but truly understand or used estimate does not occupy the majority. This article is not about industry prospects, but technology.
In order to facilitate us to better understand the background of the story, by the way popular science smart home, want to go straight to the WebSocket theme of children’s shoes can be directly chapter 2.
Smart home is a typical application scenario of the Internet of Things. What is the Internet of Things? It literally means the Internet of things (IoT), a network of connected objects. From your mobile phone and Bluetooth headset to every corner of a city, the ultimate form is “the Internet of everything”. (Why do you suddenly think of ten thousand Buddhas?)
IoT doesn’t really care about the protocol of the network or what it is connected to. In this form, it is IoT. In fact, this concept has been around for a long time. When I was in college, I heard the word “Internet of Things” from 2009 to 2010, not afraid to reveal my age. Remember the GPRS intelligent meter reading system of the experimental class yao, you may not feel how tall, is not the water meter inserted a SIM card, regularly send the value to the client, no longer afraid of being visited the water meter ~ yes, this is also an embodiment of IoT.
But over the years, the Internet of Things has been tepid. As to why it was put forward again in recent years, intelligent household played a very important role among them. In addition, AI also played a role in the fire, any product to add intelligence two words. So we have AI + IoT, or AIOT. I won’t go into details, but I can write a separate article on the Internet of Things.
Back to smart home, smart home is the Internet of things in the home. To put it simply, you can control the lights, sockets, monitoring, electrical appliances and so on through the home central control or App.
The text is a ball, a thousand worries diagram, frame composition
Easy to explain, the function of the gateway is responsible for a device that connects all the room, between the gateway and equipment generally don’t use HTTP (excluding individual item), and will use the near field communication protocols such as such as zigbee, lora (or direct cable way more stable), because of low power consumption, save electricity, if you want to home with dozens of switch panel, It can save ten or twenty dollars of electricity every month. Of course, the most important reason is that the scheme is also relatively mature. Therefore, before using these devices, you need to perform a networking operation to group the devices to the gateway. If disconnected, the gateway considers the device offline.
The gateway reports the device status to the cloud platform, and the cloud platform sends the status to the client. Again, client-side control of the device is a reverse process. Of course, if 5G is popularized in the future, the equipment can directly connect to the cloud platform through 5G network, because 5G has low power consumption, low latency and fast speed, there is no need for gateway.
The overall structure is relatively simple, of course, there are also a lot of complex logic in the middle, involving such as equipment status, personnel, authority, house management, etc., which is not concerned here.
Push is involved when the server delivers device status. Because smart home has the concept of situational mode, such as going home mode, performing a request and clicking on a dozen lights, it is not realistic to judge the status by the return value of the request.
Due to the particularity of the business, the push of smart home needs to have high real-time performance. For example, when the user turns on the light (whether on or directly on the APP), the status of the light on the APP needs to be immediately turned on. If you wait three or five seconds for the state to change, the user experience is bad.
We used some light push at the earliest, and found that the delay was serious. Sometimes we had to wait for more than ten seconds to receive the push. After all, the applicable scenarios were different. So I decided to build my own long connection when pushing. After selection, decided to use the protagonist of this paper, WebSocket.
Existing problems with WebSocket state management
For an introduction to the WebSocket protocol, watch WebSocket Protocol: 5 minutes from Getting Started to Mastering. There are many ready-made WebSocket connection libraries on the market, notably Java-WebSocket, and OkHttp also comes with WebSocket support.
Initially, OkHttp was used directly because it was already plugged into the project. The way to use it is very simple and should be obvious to anyone familiar with OkHttp,
OkHttpClient client = OkHttpClient.Builder().build();
Request request = new Request.Builder().build();
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(okhttp3.WebSocket webSocket, Response response) {}
@Override
public void onMessage(okhttp3.WebSocket webSocket, String text) {}
@Override
public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {}
@Override
public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {}});Copy the code
It is easy to call and the callback status is clear. Java-websocket is similar, but generally has the following problems:
1. heartbeat
Both OkHttp and Java-WebSocket have a heartbeat mechanism. However, OkHttp’s heartbeat interval, known as pingInterval, is fixed at the time of client creation and cannot be adjusted mid-stream. (Jake also responded to the question on Github, meaning callers don’t need to pay attention to it. But there is often a need 😢😢😢)
For example, when the application is in the foreground, the heartbeat may be slightly more frequent, but when the application is in the background, the heartbeat interval can be set slightly longer for power-saving optimization. If you need to adjust the heartbeat, you need to create a new client, disconnect the old client, and reconnect the client. There are two strategies:
- First cut off the old, then connect to the new. If the server does not cache messages, messages may be missed between a break and a new connection.
- First connect the new, then disconnect the old. This requires that two connections be made to the server at once, and the server may have an exception to the state of the message.
Both of the above two strategies need to be agreed between the client and the server. The server does extra processing, which increases the r&d cost and error probability.
2. Broken reconnection
Usually the connection is broken in the following cases:
- Client active disconnection (no need to reconnect)
- Server Active disconnection (no need to reconnect)
- Client due to network reasons, such as mobile phone disconnection, inactive segment (need to reconnect)
- Carrier disconnection due to idle connection (reconnection required)
For the last case, please refer to the mobile IM Practice: Implementing the Smart Heartbeat mechanism of wechat on Android
For reconnection, the time interval for reconnection should also be considered. If the mobile phone network is broken, it will also fail to reconnect continuously, so you need to formulate a relationship between the number of reconnection and the time interval.
In short, the logic and strategy of reconnection is something we need to maintain ourselves.
3. multithreading
The callback to the state in OkHttp occurs in the child thread created by OkHttp, and we need to initiate a reconnection based on the state. If our application switches between the front and back and we have to create new connections, it’s a real mess. Questions to consider include:
- If multiple connection requests are initiated at the same time, you need to wait for the completion of one connection request and then initiate the next connection request. If the previous connection is connected, the next waiting connection request can be skipped.
- Connection requests occur in different threads, and the state is synchronized
The solution
Since I used OkHttp in my project, the initial solutions were designed for OkHttp. The general idea is as follows:
1. heartbeat
1.1 OkHttp WebSocket Heartbeat Analysis
OkHttp does not support dynamic heartbeat adjustment. OkHttp does not support dynamic heartbeat adjustment.
To analyze the source code, we start by calling the entry client.newWebSocket:
// [email protected]
/** * Internally uses the RealWebSocket class to initiate connections */
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
// Notice the pingInterval, so it was fixed when the RealWebSocket was created and cannot be changed.
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
Copy the code
// [email protected]
public RealWebSocket(Request request, WebSocketListener listener, Random random, long pingIntervalMillis) {
this.originalRequest = request;
this.listener = listener;
this.random = random;
this.pingIntervalMillis = pingIntervalMillis;
// ...
// The writerRunnable used to send messages to the server is described below
this.writerRunnable = new Runnable() {
@Override public void run(a) {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null); }}}; }public void connect(OkHttpClient client) {
// WebSocket protocol-related headers
client = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build();
final Request request = originalRequest.newBuilder()
.header("Upgrade"."websocket")
.header("Connection"."Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version"."13")
.build();
call = Internal.instance.newWebSocketCall(client, request);
call.enqueue(new Callback() {
@Override public void onResponse(Call call, Response response) {
try {
checkResponse(response);
} catch (ProtocolException e) {
failWebSocket(e, response);
closeQuietly(response);
return;
}
// Promote the HTTP streams into web socket streams.
StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
streamAllocation.noNewStreams(); // Prevent connection pooling!
Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
// Process all web socket messages.
try {
// Connect successfully, call listern onOpen
listener.onOpen(RealWebSocket.this, response);
String name = "OkHttp WebSocket " + request.url().redact();
// Focus 1: create reader and writer, see below
initReaderAndWriter(name, streams);
streamAllocation.connection().socket().setSoTimeout(0);
// Key 2: polling read
loopReader();
} catch (Exception e) {
failWebSocket(e, null); }}@Override public void onFailure(Call call, IOException e) {
failWebSocket(e, null); }}); }public void initReaderAndWriter(String name, Streams streams) throws IOException {
synchronized (this) {
this.streams = streams;
this.writer = new WebSocketWriter(streams.client, streams.sink, random);
// Create the Scheduled thread pool
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if(pingIntervalMillis ! =0) {
// Using a thread pool to run PingRunnable periodically, getting closer and closer to reality
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
if(! messageAndCloseQueue.isEmpty()) { runWriter();// Send messages that were enqueued before we were connected.}}// Create a reader for reading messages
reader = new WebSocketReader(streams.client, streams.source, this);
}
Copy the code
What does PingRunnable do
private final class PingRunnable implements Runnable {
PingRunnable() {
}
@Override public void run(a) {
// Ping framewritePingFrame(); }}void writePingFrame(a) {
WebSocketWriter writer;
int failedPing;
synchronized (this) {
if (failed) return;
writer = this.writer;
// Set failedPing to sentPingCount if waiting for PONG, or -1 if waiting for failedPing
failedPing = awaitingPong ? sentPingCount : -1;
sentPingCount++;
awaitingPong = true;
}
if(failedPing ! = -1) {
// This is run only if the previous pong has not returned
failWebSocket(new SocketTimeoutException("sent ping but didn't receive pong within "
+ pingIntervalMillis + "ms (after " + (failedPing - 1) + " successful ping/pongs)"),
null);
return;
}
try {
// Send a ping message
writer.writePing(ByteString.EMPTY);
} catch (IOException e) {
failWebSocket(e, null); }}Copy the code
So how do you know if you received Pong? You need to use the Reader. So let’s see what’s going on in reader.
Remember the loopReader marked in point 2?
// [email protected]
/** Receive frames until there are no more. Invoked only by the reader thread. */
public void loopReader(a) throws IOException {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
// Process the received framereader.processNextFrame(); }}/** * Check whether the header is a message frame or a control frame, heartbeat pong belongs to the control frame */
void processNextFrame(a) throws IOException {
readHeader();
if (isControlFrame) {
readControlFrame();
} else{ readMessageFrame(); }}/** * parse opcode */
private void readControlFrame(a) throws IOException {
// ...
switch (opcode) {
case OPCODE_CONTROL_PING:
frameCallback.onReadPing(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_PONG:
// Note: onReadPing is triggered when pong is received
frameCallback.onReadPong(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_CLOSE:
// ...
frameCallback.onReadClose(code, reason);
closed = true;
break;
default:
throw new ProtocolException("Unknown control opcode: "+ toHexString(opcode)); }}/** * Set the awaitingPong state to false */
@Override public synchronized void onReadPong(ByteString buffer) {
// This API doesn't expose pings.
receivedPongCount++;
awaitingPong = false;
}
Copy the code
After catching up, I found that after receiving Pong, OKhttp simply changed the status of the awaitingPong marker, which meant that the last pong message was received.
To summarize, okHTTP’s heartbeat flow,
- Create a schedule thread pool when ReadWebSocket connects, periodically execute PingRunnable to ping messages; (Send additional request messages in WriteRunnable)
- Create a reader to receive messages continuously;
- Before sending the ping, check whether the previous pong has returned through the awaitingPong tag.
- Parse the received message. If the pong is received, change the awaitingPong state.
1.2 Dynamically adjust okHTTP heartbeat
We found that it was possible to dynamically change the ping rate without recreating the OkHttpClient by simply changing the thread pool to execute The PingRunable delayTime.
So you can use reflection to shutDown the old thread pool, create a new one, and perform pingRunnable with a new pingInterval.
Class clazz = Class.forName("okhttp3.internal.ws.RealWebSocket");
Field field = clazz.getDeclaredField("executor");
field.setAccessible(true);
ScheduledExecutorService oldService = (ScheduledExecutorService) field.get(mWebSocket);
Class[] innerClasses = Class.forName("okhttp3.internal.ws.RealWebSocket").getDeclaredClasses();
for (Class innerClass : innerClasses) {
if ("PingRunnable".equals(innerClass.getSimpleName())) {
// Create a new PingRunnable instance
Constructor constructor = innerClass.getDeclaredConstructor(RealWebSocket.class);
constructor.setAccessible(true);
Object pingRunnable = constructor.newInstance(mWebSocket);
Create a new thread pool
ScheduledThreadPoolExecutor newService = new ScheduledThreadPoolExecutor(1, Util.threadFactory("ws-ping".false));
newService.scheduleAtFixedRate((Runnable) pingRunnable, interval, interval, unit);
field.set(mWebSocket, newService);
// shutdown the old thread poololdService.shutdown(); }}Copy the code
In this way, we achieve the purpose of dynamic adjustment of the heartbeat.
The ping of java-websocket is sent by calling sendPing() externally. There is no ping/pong state mechanism internally, so we need to maintain this relationship ourselves. Like OkHttp, you can use timed messages to send pings and then parse Pong to maintain heartbeat status. I won’t elaborate on that here.
2. Broken reconnection
As mentioned earlier, disconnecting and reconnecting simply deals with the logic of several disconnected states. Fortunately, most Websocket libraries have close and error callbacks, which can distinguish between abnormal disconnection and active disconnection.
What needs to be maintained is the relationship between the reconnection interval and retry times, similar to the exponential backoff algorithm.
3. multithreading
In view of the above mentioned multithreading problem, the main problem is that when multiple connection requests occur at the same time, it is better to parallel the serial than to add various synchronization to multithreading. In a message queue-like fashion, message processing is carried out in a separate thread. This saves thread synchronization and keeps state order.
In fact, in Android, HandlerThread can perfectly meet the above two points, and the author did use HandlerThread to do it at first. Later, I realized that it could be platform-independent and run on a pure Java platform, so I implemented it manually. Maintaining your own message queues allows you to better handle priorities, latency, and so on.
WebSocketGo
As mentioned earlier, my project is based on OkHttp, but I found that the state management section could be abstracted separately, hence the title of this article, WebSocketGo (WsGo).
Welcome to Star, github.com/Gnepux/WebS…
The data flow is as follows:
-
The Dispatcher is the message queue mentioned above, which is the producer-consumer mode. It maintains two queues, sends commands and receives events.
-
The commands include CONNECT, DISCONNECT, RECONNECT, CHANGE_PING, and SEND
-
Events include OnConnect, onMessage, onSend, onRetry, onDisConnect, and onClose
-
-
The Channel Manager is responsible for state management.
-
The WebSocket Interface can be understood as an adaptation layer that calls the WebSocket library.
-
The Event Listener is the foreground state callback.
Access WsGo
implementation 'com. Gnepux: wsgo: 1.0.2'
// use okhttp
implementation 'com. Gnepux: wsgo - okwebsocket: 1.0.1'
// use java websocket
implementation 'com. Gnepux: wsgo - jwebsocket: 1.0.1'
Copy the code
Initialize the
WsConfig config = new WsConfig.Builder()
.debugMode(true) // true to print log
.setUrl(pushUrl) // ws url
.setHttpHeaders(headerMap) // http headers
.setConnectTimeout(10 * 1000L) // connect timeout
.setReadTimeout(10 * 1000L) // read timeout
.setWriteTimeout(10 * 1000L) // write timeout
.setPingInterval(10 * 1000L) // initial ping interval
.setWebSocket(OkWebSocket.create()) // websocket client
.setRetryStrategy(retryStrategy) // retry count and delay time strategy
.setEventListener(eventListener) // event listener
.build();
WsGo.init(config);
Copy the code
Go
/ / the connection
WsGo.getInstance().connect();
// Send a message
WsGo.getInstance().send("hello from WsGo");
/ / disconnect
WsGo.getInstance().disconnect(1000."close");
WsGo.getInstance().disconnectNormal("close");
// Change the heartbeat
WsGo.getInstance().changePingInterval(10, TimeUnit.SECONDS);
/ / release
WsGo.getInstance().destroyInstance();
Copy the code
More configuration for WsConfig
setWebSocket(WebSocket socket)
WsGo already supports OkHttp and Java WebSocket
// for OkHttp (wsgo-okwebsocket)
setWebSocket(OkWebSocket.create());
// for Java WebSocket (wsgo-jwebsocket)
setWebSocket(JWebSocket.create());
Copy the code
If you need to use another WebSocket library or custom client, just implement a WebSocket interface and pass the corresponding result to ChannelCallback. WsGo will do the rest of the connection management for you.
public interface WebSocket {
void connect(WsConfig config, ChannelCallback callback);
void reconnect(WsConfig config, ChannelCallback callback);
boolean disconnect(int code, String reason);
void changePingInterval(long interval, TimeUnit unit);
boolean send(String msg);
}
Copy the code
setRetryStrategy(RetryStrategy retryStrategy)
For abnormal disconnections, the WsGo automatically reconnects. RetryStrategy refers to the relationship between reconnection times and latency.
WsGo has a DefaultRetryStrategy by default. If you need to adjust it yourself, implement the onRetry method in the RetryStrategy interface.
public interface RetryStrategy {
/** * The relationship between retry times and delay **@paramRetryCount Specifies the retry times *@returnDelay time */
long onRetry(long retryCount);
}
Copy the code
setEventListener(EventListener eventListener)
Add event callbacks. Note that the callback runs in a thread created by the WsGo itself, not in the calling thread. If necessary, the manual switch thread needs to be called at the meeting.
public interface EventListener {
void onConnect(a);
void onDisConnect(Throwable throwable);
void onClose(int code, String reason);
void onMessage(String text);
void onReconnect(long retryCount, long delayMillSec);
void onSend(String text, boolean success);
}
Copy the code
PS
WsGo is not currently associated with notification of changes in the mobile network (i.e. disconnection can be called back and network recovery will not be automatically reconnected), because I think this part does not belong to the state management of WebSocket itself, and it is no longer platform independent. If you have this convenience requirement, you need to initiate the connection from outside. Simply encapsulate it in your project
Afterword.
This article is a bit longer, starting from the initial smart home scenario, introduces the existing problems of WebSocket state management, then gives the author’s solutions, and finally the general state management framework WebSocketGo. Is also the author in the work of a little thinking and summary. In fact, not only Websockets, all long connections should face the same problem. But the Angle of thinking and the starting point of solution should be the same.