The introduction

The dashboard in the middle and background is very complicated, especially when it needs full screen application, the real-time demand of data is very high. WebSocket in any environment is actually very simple to use, the modern browser implementation standards are very uniform, and the interface is simple enough.

Even in Angular, WebSocket can be used with just a few lines of code.

const ws = new WebSocket('wss://echo.websocket.org');
ws.onmessage = (e) = > {
  console.log('message', e);
}
Copy the code

To send a message to the server:

ws.send(`content`);
Copy the code

Most people in Angular take this code further, such as unified message parsing, error handling, multiplexing, and finally encapsulating it as a service class.

In fact, RxJS also wraps a WebSocket Subject, located in RxJS/WebSocket.

How to use

If the above example were written using RxJS, then:

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

const ws = webSocket('wss://echo.websocket.org');

ws.subscribe(res= > {
  console.log('message', res);
});

ws.next(`content`);
Copy the code

WebSocket is a factory function that produces a WebSocketSubject object that can be subscribed to multiple times. Failure to subscribe or cancel the last subscription will result in the webSocket connection being disconnected, and the connection will be automatically reconnected when the next subscription is made.

WebSocketSubjectConfig

In addition to accepting strings (webSocket service remote addresses), webSocket allows you to specify more complex configuration items.

By default, messages are serialized and deserialized using json. parse and json. stringify. Therefore, messages are sent and received using JSON, which can be changed using serializer or Deserializer properties.

If you need to care when the WebSocket starts or ends (closeObserver), then:

const open$ = new Subject();
const ws = webSocket({
  url: 'wss://echo.websocket.org',
  openObserver: open$
});
// Subscribe to open events
open$.subscribe((a)= > {});
Copy the code

The message

WebSocketSubject is also a variation of Subject, so subscribing to it means receiving messages, whereas next, Complete, and Error are used to maintain the push of messages.

  • usenextTo send a message
  • usecompleteAttempts are made to check if the subscription is the last, and if so, the connection is closed
  • useerrorEquivalent to primitivecloseMethod and must be provided{ code: number, reason? : string}Parameter, notecodeBe sure to comply withValue range

Can be replay

When next is called to send a message if the WebSocket connection breaks (for example, if no one subscribes), the message is cached and sent sequentially after the next reconnection. This is very convenient in the asynchronous world. We just need to make sure that Angular starts up with WebSocket initialization so that whenever you subscribe to receive messages, you can send them without waiting.

The fact that RxJS WebSocket is a WebSocketSubject produced by WebSocket by default is essentially the “replay” capability of ReplaySubject. Of course you can change this behavior with the second parameter to webSocket.

multiplexing

In general, it is unlikely that a Single Web Socket service will do everything, but it is also unlikely that a webSocket will be created for every business instance. Often we add a layer of gateways and aggregate these business Websockets, and always only need one connection for the front end, which is why multiplexing exists.

The core is that you have to let the back end know when to send what messages to what services.

The multiplex method must be used to create an Observable that subscribes to a message. It takes three parameters to distinguish messages:

  • subMsgTell me which messages you are subscribing to
  • unsubMsgTell me which message to unsubscribe from
  • messageFilterFilter messages so that subscribers receive only which messages
const ws = webSocket('wss://echo.websocket.org');
const user$ = this.ws.multiplex(
    (a)= > ({ type: 'subscribe', tag: 'user' }),
    (a)= > ({ type: 'unsubscribe', tag: 'user' }),
    message= > message.type === 'user'
);
user$.subscribe(message= > console.log(message));

const todo$ = this.ws.multiplex(
    (a)= > ({ type: 'subscribe', tag: 'todo' }),
    (a)= > ({ type: 'unsubscribe', tag: 'todo' }),
    message= > message.type === 'todo'
);
todo$.subscribe(message= > console.log(message));
Copy the code

The user$stream and the Todo $stream share a WebSocket connection, which is multiplexed.

Even though the subscription is created through multiplex, the push of the message still needs to use ws-next ().

conclusion

This was supposed to be a simple internal training exercise, but I found that surprisingly few people discussed the implementation of Web sockets in RxJS.

There was always a plan to build a WebSocket into NG-Alain, but it was completely worthless from a packaging point of view because it was elegant enough.