The introduction
The dashboard in the middle and background is very complicated, especially when it needs full screen application, the real-time demand of data is very high. WebSocket in any environment is actually very simple to use, the modern browser implementation standards are very uniform, and the interface is simple enough.
Even in Angular, WebSocket can be used with just a few lines of code.
const ws = new WebSocket('wss://echo.websocket.org');
ws.onmessage = (e) = > {
console.log('message', e);
}
Copy the code
To send a message to the server:
ws.send(`content`);
Copy the code
Most people in Angular take this code further, such as unified message parsing, error handling, multiplexing, and finally encapsulating it as a service class.
In fact, RxJS also wraps a WebSocket Subject, located in RxJS/WebSocket.
How to use
If the above example were written using RxJS, then:
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
const ws = webSocket('wss://echo.websocket.org');
ws.subscribe(res= > {
console.log('message', res);
});
ws.next(`content`);
Copy the code
WebSocket is a factory function that produces a WebSocketSubject object that can be subscribed to multiple times. Failure to subscribe or cancel the last subscription will result in the webSocket connection being disconnected, and the connection will be automatically reconnected when the next subscription is made.
WebSocketSubjectConfig
In addition to accepting strings (webSocket service remote addresses), webSocket allows you to specify more complex configuration items.
By default, messages are serialized and deserialized using json. parse and json. stringify. Therefore, messages are sent and received using JSON, which can be changed using serializer or Deserializer properties.
If you need to care when the WebSocket starts or ends (closeObserver), then:
const open$ = new Subject();
const ws = webSocket({
url: 'wss://echo.websocket.org',
openObserver: open$
});
// Subscribe to open events
open$.subscribe((a)= > {});
Copy the code
The message
WebSocketSubject is also a variation of Subject, so subscribing to it means receiving messages, whereas next, Complete, and Error are used to maintain the push of messages.
- use
next
To send a message - use
complete
Attempts are made to check if the subscription is the last, and if so, the connection is closed - use
error
Equivalent to primitiveclose
Method and must be provided{ code: number, reason? : string}
Parameter, notecode
Be sure to comply withValue range
Can be replay
When next is called to send a message if the WebSocket connection breaks (for example, if no one subscribes), the message is cached and sent sequentially after the next reconnection. This is very convenient in the asynchronous world. We just need to make sure that Angular starts up with WebSocket initialization so that whenever you subscribe to receive messages, you can send them without waiting.
The fact that RxJS WebSocket is a WebSocketSubject produced by WebSocket by default is essentially the “replay” capability of ReplaySubject. Of course you can change this behavior with the second parameter to webSocket.
multiplexing
In general, it is unlikely that a Single Web Socket service will do everything, but it is also unlikely that a webSocket will be created for every business instance. Often we add a layer of gateways and aggregate these business Websockets, and always only need one connection for the front end, which is why multiplexing exists.
The core is that you have to let the back end know when to send what messages to what services.
The multiplex method must be used to create an Observable that subscribes to a message. It takes three parameters to distinguish messages:
subMsg
Tell me which messages you are subscribing tounsubMsg
Tell me which message to unsubscribe frommessageFilter
Filter messages so that subscribers receive only which messages
const ws = webSocket('wss://echo.websocket.org');
const user$ = this.ws.multiplex(
(a)= > ({ type: 'subscribe', tag: 'user' }),
(a)= > ({ type: 'unsubscribe', tag: 'user' }),
message= > message.type === 'user'
);
user$.subscribe(message= > console.log(message));
const todo$ = this.ws.multiplex(
(a)= > ({ type: 'subscribe', tag: 'todo' }),
(a)= > ({ type: 'unsubscribe', tag: 'todo' }),
message= > message.type === 'todo'
);
todo$.subscribe(message= > console.log(message));
Copy the code
The user$stream and the Todo $stream share a WebSocket connection, which is multiplexed.
Even though the subscription is created through multiplex, the push of the message still needs to use ws-next ().
conclusion
This was supposed to be a simple internal training exercise, but I found that surprisingly few people discussed the implementation of Web sockets in RxJS.
There was always a plan to build a WebSocket into NG-Alain, but it was completely worthless from a packaging point of view because it was elegant enough.