* This article has been published exclusively by guolin_blog, an official wechat account

Summary of a.

1. RxWebSocket is a WebSocket client based on OKHTTP and RxJava encapsulation. The core features of this library are In addition to manually closing WebSocket(i.e. RxJava unsubscribe),WebSocket automatically reconnects when it is closed abnormally (onFailure, WebSocketException, etc.). Second, the WebSocket cache processing, the same URL, sharing a WebSocket.

2. Because it is based on RxJava encapsulation, it brings unlimited possibilities and can be used with RxBinding and Rxlifecycle to facilitate the management of WebSockets.

Project address: Welcome star

reconnection

The project has been uploaded to Jcenter.

/ / in this project
compile 'com. DHH: websocket: 1.3.0'

// Since the project is written based on OKHTTP,RxJava,RxAndroid, the following dependencies need to be added.
//okhttp,RxJava,RxAndroid
compile 'com. Squareup. Okhttp3: okhttp: 3.9.0'
compile 'the IO. Reactivex: rxjava: 1.3.1'
compile 'the IO. Reactivex: rxandroid: 1.2.1'Copy the code

2. Use method

0. Initialize. You can use it directly or ignore it.

If you want to use your own okhttpClient:

        OkHttpClient yourClient = new OkHttpClient();
        RxWebSocketUtil.getInstance(a).setClient(yourClient);Copy the code

Whether to print logs:

RxWebSocketUtil.getInstance(a).setShowLog(BuildConfig.DEBUG);Copy the code

1. Obtain a WebSocket and receive messages in various ways:

RxWebSocketUtil.getInstance().getWebSocketInfo(url)
                        .subscribe(new Action1<WebSocketInfo>() {
                            @Override
                            public void call(WebSocketInfo webSocketInfo) {
                                mWebSocket = webSocketInfo.getWebSocket();
                                Log.d("MainActivity", webSocketInfo.getString());
                                Log.d("MainActivity"."ByteString:"+ webSocketInfo.getByteString()); }}); mWebSocket.send("hello word");

        //get StringMsg
        RxWebSocketUtil.getInstance().getWebSocketString(url)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                    }
                });
        // get ByteString
        RxWebSocketUtil.getInstance().getWebSocketByteString(url)
                .subscribe(new Action1<ByteString>() {
                    @Override
                    public void call(ByteString byteString) {

                    }
                });
        //get WebSocket
        RxWebSocketUtil.getInstance().getWebSocket(url)
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {

                    }
                });
       // The WebSocket with timeout is reconnected to the WebSocket if no message is received within the specified time. To fit the Mi tablet.
       // When testing xiaomi tablet, there was a network disconnection, do not send error, resulting in can not reconnect
        RxWebSocketUtil.getInstance().getWebSocketInfo(url,10, TimeUnit.SECONDS)
                .subscribe(new Action1<WebSocketInfo>() {
                    @Override
                    public void call(WebSocketInfo webSocketInfo) {

                    }
                });       Copy the code

2. Send a message:

// address mWebSocket directly with a reference to WebSocket.send("hello word");// RxWebSocketUtil is an error if the WebSocket corresponding to the url is already open.getInstance(a).send(url, "hello");
  RxWebSocketUtil.getInstance(a).send(url, ByteString.EMPTY);If WebSocket is open, send the data directly. If not, open a WebSocket and close the data directly. RxWebSocketUtil.getInstance(a).asyncSend(url, "hello");
  RxWebSocketUtil.getInstance(a).asyncSend(url, ByteString.EMPTY);Copy the code

3. Close the WebSocket

The project is implemented using RxJava, so the way to turn off WebSocket is to log out when appropriate Observable, in the demo of the project, writes a simple lifecycle that binds the Observable life to the Activity’s onDestroy and automatically logs out. See demo for code details. Internal WebSocket sharing mechanism of the same URL is implemented, so the WebSocket connection is automatically closed when all external Observables holding the URL are logged out. Please refer to the principle analysis section. There are two common logout methods:

       // Note that there are many ways to unsubscribe such as rxlifecycle
                        mSubscription = RxWebSocketUtil.getInstance().getWebSocketInfo(url)
                                .subscribe(new Action1<WebSocketInfo>() {
                                    @Override
                                    public void call(WebSocketInfo webSocketInfo) {
                                        mWebSocket = webSocketInfo.getWebSocket();
                                        if (webSocketInfo.isOnOpen()) {
                                            Log.d("MainActivity"." on WebSocket open");
                                        } else {

                                            String string = webSocketInfo.getString();
                                            if (string ! = null) {
                                                Log.d("MainActivity".string);
                                                textview.setText(Html.fromHtml(string));

                                            }

                                            ByteString byteString = webSocketInfo.getByteString();
                                            if (byteString ! = null) {
                                                Log.d("MainActivity"."webSocketInfo.getByteString():" +byteString); }}}});/ / logout
    if (mSubscription ! = null) {
            mSubscription.unsubscribe();
        }

Lifecycle log out. For details, see Demo
        RxWebSocketUtil.getInstance().getWebSocketString(url)
                .compose(this.<String>bindOnActivityEvent(ActivityEvent.onDestory))
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {

                    }
                });
Copy the code

Three. Principle analysis

1. You need to first okhttp WebSocket packaging into observables, because of the need to WebSocket, Stringmsg, ByteString information sent to the observer so together to build a WebSocketInfo class encapsulates the information:

public class WebSocketInfo {
    private WebSocket mWebSocket;
    private String mString;
    private ByteString mByteString;
    private boolean onOpen;
    // Other omissions
}Copy the code

The onOpen field is used to determine whether the current WebSocketInfo is a message sent when the WebSocket is open (onOpen). In this case,Stringmsg and ByteString are null.

2. Wrap WebSocketInfo as an Observable

    private final class WebSocketOnSubscribe implements Observable.OnSubscribe<WebSocketInfo> {
        private String url;

        private WebSocket webSocket;

        private WebSocketInfo startInfo, stringInfo, byteStringInfo;

        public WebSocketOnSubscribe(String url) {
            this.url = url;
            startInfo = new WebSocketInfo(true);
            stringInfo = new WebSocketInfo();
            byteStringInfo = new WebSocketInfo();
        }

        @Override
        public void call(final Subscriber<? super WebSocketInfo> subscriber) {
            if(webSocket ! =null) {
                // Reduce the reconnection frequency
                if (!"main".equals(Thread.currentThread().getName())) {
                    SystemClock.sleep(2000);
                }
            }
            initWebSocket(subscriber);
        }

        private void initWebSocket(final Subscriber<? super WebSocketInfo> subscriber) {
            webSocket = client.newWebSocket(getRequest(url), new WebSocketListener() {
                @Override
                public void onOpen(final WebSocket webSocket, Response response) {
                    if (showLog) {
                        Log.d("RxWebSocketUtil", url + " --> onOpen");
                    }
                    webSocketMap.put(url, webSocket);
                    AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
                        @Override
                        public void call() {
                            if(! subscriber.isUnsubscribed()) { subscriber.onStart(); startInfo.setWebSocket(webSocket); subscriber.onNext(startInfo); }}}); } @Overridepublic void onMessage(WebSocket webSocket, String text) {
                    if(! subscriber.isUnsubscribed()) { stringInfo.setWebSocket(webSocket); stringInfo.setString(text); subscriber.onNext(stringInfo); } } @Overridepublic void onMessage(WebSocket webSocket, ByteString bytes) {
                    if(! subscriber.isUnsubscribed()) { byteStringInfo.setWebSocket(webSocket); byteStringInfo.setByteString(bytes); subscriber.onNext(byteStringInfo); } } @Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {
                    if (showLog) {
                        Log.e("RxWebSocketUtil", t.toString() + webSocket.request().url().uri().getPath());
                    }
                    if(! subscriber.isUnsubscribed()) { subscriber.onError(t); } } @Overridepublic void onClosing(WebSocket webSocket, int code, String reason) {
                    webSocket.close(1000.null);
                }

                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    if (showLog) {
                        Log.d("RxWebSocketUtil", url + " --> onClosed:code= "+ code); }}}); subscriber.add(new MainThreadSubscription() {
                @Override
                protected void onUnsubscribe() {
                    webSocket.close(3000."Manual shutdown"); }}); }}Copy the code

Implement a WebSocketOnSubscribe to convert the WebSocket callback to a subscriber call. Sends to the downstream Observable. Call subscriber.onstart () on onOpen and send an onOpen WebSocketInfo. WebSocket is closed at subscriber cancellation. Systemclock. sleep(2000) is at the top of the call method, which is mainly to reduce the reconnection frequency in case of disconnection, which will be discussed below.

Packaged into observables:

Observable.create(new WebSocketOnSubscribe(url))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());Copy the code

3. Automatic reconnection:

Observable.create(new WebSocketOnSubscribe(URL)) // Automatic reconnection.timeout(timeout, timeUnit)
                    .retry(a).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());Copy the code

The RxJava Retry operator, which perfectly implements this function, eats up the error and reinvokes it when a Throwable is issued upstream The onSubscribe call method, also known as the WebSocket Subscribe call, reinitializes a WebSocket connection and reconnects. If there is no network, retry calls are made very frequently, so in the call method, when the connection is reconnected Systemclock. sleep(2000), sleep for 2 seconds, so that the reconnection frequency is 2 seconds. Of course there is a timeout operator on top of retry. When subscriber.onnext () is not called within the specified time interval, a timeoutException is issued to retry the WebSocket. This is mainly to adapt to some domestic models, when WebSocket connection abnormal, will not timely error, such as Xiaomi tablet. The original WebSocket is closed on each reconnection.

4. Implement WebSocket sharing of the same URL

Observable.create(new WebSocketOnSubscribe(URL)) // Automatic reconnection.timeout(timeout, timeUnit)
                    .retry(a) / / share.doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            observableMap.remove(url);
                            webSocketMap.remove(url);
                            if (showLog) {
                                Log.d("RxWebSocketUtil"."Cancel");}}}).doOnNext(new Action1<WebSocketInfo>() {
                        @Override
                        public void call(WebSocketInfo webSocketInfo) {
                            if (webSocketInfo.isOnOpen()) {
                                webSocketMap.put(url, webSocketInfo.getWebSocket());}}}).share(a).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());Copy the code

The sharing function is implemented to prevent the WebSocket of one URL from establishing multiple connections. This is mainly implemented by the SHARE operator of RxJava. The share operator enables an Observable to have multiple subscribers Observable unsubscribes only if subscriber is unsubscribed.

    public Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
        Observable<WebSocketInfo> observable = observableMap.get(url);
        if (observable == null) {
            observable = Observable.create(new WebSocketOnSubscribe(url))
                    // Automatic reconnection
                    .timeout(timeout, timeUnit)
                    .retry()
                    / / Shared
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            observableMap.remove(url);
                            webSocketMap.remove(url);
                            if (showLog) {
                                Log.d("RxWebSocketUtil"."Cancel");
                            }
                        }
                    })
                    .doOnNext(new Action1<WebSocketInfo>() {
                        @Override
                        public void call(WebSocketInfo webSocketInfo) {
                            if (webSocketInfo.isOnOpen()) {
                                webSocketMap.put(url, webSocketInfo.getWebSocket());
                            }
                        }
                    })
                    .share()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
            observableMap.put(url, observable);
        } else {
            observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);
        }
        return observable;
    }Copy the code

DoOnUnsubscribe: Removes the cached Observable and WebSocket from the map when an Observable is unsubscribed (that is, WebSocket is closed).

DoOnNext determines whether the received WebSocketInfo was sent by the WebSocket during onOpen and caches it. An Observable subscribes to the same URL and fetches it from the cache. We should send a WebSocket onOpen event to the Observable as well:

/ / usemergeOperator to send the onOpen event to the subscriber Observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);Copy the code

This way, a WebSocket with the same URL, no matter where or when it subscribed, will receive an onOpen event, acting externally like a new WebSocket.

Several variations of the getWebSocketInfo method:

    /** * default timeout: 30 days * 

* Call this method *

*/
public Observable<WebSocketInfo> getWebSocketInfo(String url) { return getWebSocketInfo(url, 30, TimeUnit.DAYS); } public Observable<String> getWebSocketString(String url) { return getWebSocketInfo(url) .map(new Func1<WebSocketInfo, String>() { @Override public String call(WebSocketInfo webSocketInfo) { return webSocketInfo.getString(); } }) .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { returns ! =null; }}); }public Observable<ByteString> getWebSocketByteString(String url) { return getWebSocketInfo(url) .map(new Func1<WebSocketInfo, ByteString>() { @Override public ByteString call(WebSocketInfo webSocketInfo) { return webSocketInfo.getByteString(); } }) .filter(new Func1<ByteString, Boolean>() { @Override public Boolean call(ByteString byteString) { returnbyteString ! =null; }}); }public Observable<WebSocket> getWebSocket(String url) { return getWebSocketInfo(url) .map(new Func1<WebSocketInfo, WebSocket>() { @Override public WebSocket call(WebSocketInfo webSocketInfo) { returnwebSocketInfo.getWebSocket(); }}); }Copy the code

5. Send the message to the server

WebSocketInfo contains webSockets, so after subscribing, you can get the WebSocket reference and you can send a message to the server. Of course our RxWebSocketUtil has opened WebSocket already cached. So we can also send messages like this:

    /** * If the url's WebSocket is already open, you can call this message directly @param url
     * @param msg
     */
    public void send(String url, String msg) {
        WebSocket webSocket = webSocketMap.get(url);
        if(webSocket ! =null) {
            webSocket.send(msg);
        } else {
            throw new IllegalStateException("The WebSokcet not open"); }}/** * If the url's WebSocket is already open, you can call this message directly @param url
     * @param byteString
     */
    public void send(String url, ByteString byteString) {
        WebSocket webSocket = webSocketMap.get(url);
        if(webSocket ! =null) {
            webSocket.send(byteString);
        } else {
            throw new IllegalStateException("The WebSokcet not open"); }}Copy the code

An error is reported when the WebSocket of the specified URL is not open.

Asynchronously sends messages to the server

    /** ** ** ** * /** ** * /** ** /** * @param url
     * @param msg
     */
    public void asyncSend(String url, final String msg) {
        getWebSocket(url)
                .first()
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) { webSocket.send(msg); }}); }/** ** ** ** * /** ** * /** ** /** * @param url
     * @param byteString
     */
    public void asyncSend(String url, final ByteString byteString) {
        getWebSocket(url)
                .first()
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) { webSocket.send(byteString); }}); }Copy the code

In these two ways, you don’t need to care whether the WebSocket of the URL is open, you can send directly. GetWebSocket (URL) gets an Observable, either from the cache, or restarts a WebSocket, but you don’t need to care. After the first operator, if it’s an Observable cached, the Obse is unregistered Rvable, when it is a new WebSocket, no other subscriber will be available after canceling the current subscriber, and the new WebSocket will be closed (share operator).

Finally, if you have any good suggestions, please feel free to contact me.

Project Address:Github.com/dhhAndroid/…

If it helps, thank you Star!

Respect the original, prohibit reprint!