DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers. Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)
[a] (https://zhuanlan.zhihu.com/p/157897829), we analyzed the Oberservable and concrete implementation method of the Subscription. In this article, you will learn about a series of different Muticobservables. These Observables are mainly named after Subject in RxJS. They have the following different implementations:
-
Subject
-
AnonymousSubject
-
BehaviorSubject
-
ReplaySubject
-
AsyncSubject
The Muticasted Observable is an Observable that continuously sends data to its subscribers.
Note: the source version of RxJS used in this article is 6.6.0
Subject
The Subject is the basic Muticobservable. The subscriber subscribs to the Subject and gets the data sent by the Subject. However, if a subscriber subscribes after the data is sent, it will never get that data. Here’s a quick example:
const subject = new Subject<number>(); // Call before subscribing will not print subject.next(1); // subscription data const subscription = subject.subscribe((value) => {console.log(' subscription data A: '+ value); }); // After subscribing, the call prints data. subject.next(2); // Print the result // subscribe data A: 2Copy the code
Subject is implemented by putting observers into an array and notifying all currently present observers of an upcoming event.
class Subject<T> extends Observable<T> { observers: Observer<T>[] = []; Next (value? : T) { if (! this.isStopped) { ... const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); }}} // Next error(err: any) {... this.hasError = true; this.thrownError = err; this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].error(err); } this.observers.length = 0; } // complete similar to next complete() {... this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].complete(); } this.observers.length = 0; }}Copy the code
This overrides _subscribe to save observers in observers array when they subscribe.
_subscribe(subscriber: Subscriber<T>): Subscription { if (this.hasError) { subscriber.error(this.thrownError); return Subscription.EMPTY; } else if (this.isStopped) { subscriber.complete(); return Subscription.EMPTY; } else {// If all is well, save observers here to the observers array. this.observers.push(subscriber); // Provide a subscription object that points to the current observer. return new SubjectSubscription(this, subscriber) } }Copy the code
The Subject converts to and from observables by creating a new observable pointing to it.
asObservable(): Observable<T> {
const observable = new Observable<T>();
(<any>observable).source = this;
return observable;
}
Copy the code
AnonymousSubject
AnonymousSubject is a wrapper to Subject that has an Observer member named Destination. The Observer provides three method interfaces, next, Error, and Complete.
export interface Observer<T> { closed? : boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }Copy the code
AnonymousSubject forwards the call to destination by overloading Subject’s Next, Error, complete. Because it overloads these three important methods, it does not have the functionality provided by Subject itself. AnonymousSubject overrides these methods primarily to forward calls to destination, which provides a
export class AnonymousSubject<T> extends Subject<T> {
constructor(protected destination?: Observer<T>, source?: Observable<T>) {
super();
this.source = source;
}
next(value: T) {
const { destination } = this;
if (destination && destination.next) {
destination.next(value);
}
}
error(err: any) {
const { destination } = this;
if (destination && destination.error) {
this.destination.error(err);
}
}
complete() {
const { destination } = this;
if (destination && destination.complete) {
this.destination.complete();
}
}
}
Copy the code
It also overloads _subscribe, so it doesn’t have the ability of Subject to save subscribers.
_subscribe(subscriber: Subscriber<T>): Subscription { const { source } = this; if (source) { return this.source.subscribe(subscriber); } else { return Subscription.EMPTY; }}Copy the code
Reading the source code to use AnonymousSubject, I think the main function of AnonymousSubject is to provide a wrapper around the Subject’s lift method, which needs to return an isomorphic object that matches the current class.
export class Subject<T> extends Observable<T> { lift<R>(operator: Operator<T, R>): Observable<R> { const subject = new AnonymousSubject(this, this); subject.operator = <any>operator; return <any>subject; }}Copy the code
If a Subject is restructured directly, it is isomorphic, but stores too much redundant data, such as repeating the addition of subscribers to observers. If an Observable is directly used, it does not conform to isomorphism, because an Observable does not have the functions of Next, error and complete. Therefore, it is a relatively safe way to overwrite some methods of Subject to make it have isomorphism. Redundant data is not kept repeatedly.
BehaviorSubject
The BehaviorSubject provides data persistence (as opposed to the Subject itself) for the Subject, which stores incoming data. See the following example.
const subject = new BehaviorSubject<number>(0); // Subscribe to const subscriptionA = subject.subscribe((value) => {console.log(' subscribe data A: '+ value); }); // Call before subscribing will not print subject.next(1); Const subscriptionB = subject.subscribe((value) => {console.log(' subscribe data B: '+ value); }); // After subscribing, the call prints data. subject.next(2); // Prints the result // Subscription data A: 0 // Subscription data A: 1 // Subscription data B: 1 // Subscription data A: 2 //Copy the code
The BehaviorSubject has a _value member. Every time the BehaviorSubject calls Next to send data, the BehaviorSubject stores the data in the _value member.
export class BehaviorSubject<T> extends Subject<T> { constructor(private _value: T) { super(); } get value(): T { return this.getValue(); } getValue(): T { if (this.hasError) { throw this.thrownError; } else if (this.closed) { throw new ObjectUnsubscribedError(); } else { return this._value; }}}Copy the code
When next is called, the value passed in is saved and passed to Subject’s Next for processing.
next(value: T): void {
super.next(this._value = value);
}
Copy the code
When a BehaviorSubject is subscribed, the BehaviorSubject also sends the current stored data to the subscriber. This is done by overriding _SUBSCRIBE.
_subscribe(subscriber: Subscriber<T>): Subscription { const subscription = super._subscribe(subscriber); // As long as the subscriber is not closed, the currently stored data is sent to the subscriber. if (subscription && ! (<SubscriptionLike>subscription).closed) { subscriber.next(this._value); } return subscription; }Copy the code
AsyncSubject
The AsyncSubject does not provide the corresponding asynchronous operation, but gives the caller the power to control the arrival of the final data, and the subscriber will only receive the final data of the AsyncSubject. As the official example shows, when it calls Next alone, the subscriber does not receive the data, but only when it calls Complete does the subscriber receive the message that finally arrives. The following example illustrates how AsyncSubject works.
const subject = new AsyncSubject<number>(); Const subscriptionA = subject.subscribe((value) => {console.log(' subscribe data A: '+ value); }); // Subscribing to subject.next(1); subject.next(2); subject.next(3); subject.next(4); Const subscriptionB = subject.subscribe((value) => {console.log(' subscribe data B: '+ value); }); // Again, there is no trigger to subscribe to subject.next(5); // But the completion method triggers a subject.plete () subscription; // Print the result // Subscribe data A: 5 // Subscribe data B: 5Copy the code
AsyncSubject achieves this by preserving the send and finish states.
export class AsyncSubject<T> extends Subject<T> {
private value: T = null;
private hasNext: boolean = false;
private hasCompleted: boolean = false;
}
Copy the code
AsyncSubject’s Next does not call Subject’s Next, but saves the most recent arrival of data in the incomplete state.
next(value: T): void {
if (!this.hasCompleted) {
this.value = value;
this.hasNext = true;
}
}
Copy the code
Subject’s next is called in AsyncSubject’s complete method.
complete(): void {
this.hasCompleted = true;
if (this.hasNext) {
super.next(this.value);
}
super.complete();
}
Copy the code
ReplaySubject
The purpose of ReplaySubject is to send all received buffer data within a given period of time. When the time expires, the received data will be destroyed and the upcoming data will be collected again. BufferSize and windowTime for a given buffer. Note that the buffer policy used by ReplaySubject is FIFO.
Here are two examples to get a feel for ReplaySubject behavior. The first is as follows:
const subject = new ReplaySubject<string>(3); Const subscriptionA = subject.subscribe((value) => {console.log(' subscribe data A: '+ value); }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); Const subscriptionB = subject.subscribe((value) => {console.log(' subscribe data B: '+ value); }); // Print the result: // Subscribe data A: 1 // Subscribe data A: 2 // Subscribe data A: 3 // Subscribe data A: 4 // Subscribe data B: 2 // Subscribe data B: 3 // Subscribe data B: 4Copy the code
Here is a second example of a ReplaySubject with a window time.
const subject = new ReplaySubject<string>(10, 1000); Const subscriptionA = subject.subscribe((value) => {console.log(' subscribe data A: '+ value); }); subject.next('number'); subject.next('string'); subject.next('object'); subject.next('boolean'); setTimeout(() => { subject.next('undefined'); Const subscriptionB = subject.subscribe((value) => {console.log(' subscribe data B: '+ value); }); }, 2000); // Print the result // Subscribe data A: number // Subscribe data A: string // Subscribe data A: object // Subscribe data A: Boolean // Subscribe data A: undefined // Subscribe data B: undefinedCopy the code
ReplaySubject is similar to BehaviorSubject, but the difference is that ReplaySubject has more buffers and window time, which extends the BehaviorSubject’s usage scenarios.
In the source code, there is a third parameter called the scheduler. In general, most requirements are covered by using the default scheduler, which will be covered later.
export class ReplaySubject<T> extends Subject<T> { private _events: (ReplayEvent<T> | T)[] = []; private _bufferSize: number; private _windowTime: number; private _infiniteTimeWindow: boolean = false; constructor(bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, private scheduler? : SchedulerLike) { super(); this._bufferSize = bufferSize < 1 ? 1 : bufferSize; this._windowTime = windowTime < 1 ? 1 : windowTime; if (windowTime === Number.POSITIVE_INFINITY) { this._infiniteTimeWindow = true; this.next = this.nextInfiniteTimeWindow; } else { this.next = this.nextTimeWindow; }}}Copy the code
In the above source code, ReplaySubject will set the specific running content of next according to different window times during construction, mainly in the following two ways.
-
nextInfiniteTimeWindow
-
nextTimeWindow
nextInfiniteTimeWindow
If the window time is infinite, it means that the constraint on buffer data is only future data.
private nextInfiniteTimeWindow(value: T): void { const _events = this._events; _events.push(value); // Decide which data to keep in the buffer according to the data length and buffer size. if (_events.length > this._bufferSize) { _events.shift(); } super.next(value); }Copy the code
nextTimeWindow
If the window time is finite, the constraints for the buffer are made up of two things: window time and future data. At this point, the buffer data consists of replayEvents. ReplayEvent stores the contents of incoming data and its current timestamp.
class ReplayEvent<T> {
constructor(
readonly public time: number,
readonly public value: T
) {}
}
Copy the code
After the buffer data is checked by _trimBufferThenGetEvents, the complete data is sent to Subject’s next.
private nextTimeWindow(value: T): void {
this._events.push(new ReplayEvent(this._getNow(), value));
this._trimBufferThenGetEvents();
super.next(value);
}
Copy the code
Trimbufferthengetevents The trimbufferthengetevents method is used to determine whether the data in the event object can be retained based on the current timestamp and the size of the buffer.
private _trimBufferThenGetEvents(): ReplayEvent<T>[] { const now = this._getNow(); const _bufferSize = this._bufferSize; const _windowTime = this._windowTime; const _events = <ReplayEvent<T>[]>this._events; const eventsCount = _events.length; let spliceCount = 0; // Since the buffer is FIFO, so the time order must be from small to large, so just need to find the partition // cut point, can determine the minimum buffer data length // degree. while (spliceCount < eventsCount) { if ((now - _events[spliceCount].time) < _windowTime) { break; } spliceCount++; } // Buffer length takes precedence over cutting, so if the buffer length is exceeded, the cut point is determined by the larger side. if (eventsCount > _bufferSize) { spliceCount = Math.max(spliceCount, eventsCount - _bufferSize); } if (spliceCount > 0) { _events.splice(0, spliceCount); } return _events; }Copy the code
Subscribe to the process
The subscription process of ReplaySubject is special, because the buffer data needs to be sent when the subscription is made, and the data in the buffer changes when the subscription is made at different times, so the subscription is more important to consider. Then it’s easy to grab the _infiniteTimeWindow variable and look at the code.
Subscribe (subscriber: subscriber <T>); Subscription { const _infiniteTimeWindow = this._infiniteTimeWindow; // Update buffer const _events = _infiniteTimeWindow? this._events : this._trimBufferThenGetEvents(); const len = _events.length; // create subscription let subscription: if (this.isStopped || this.hasError) { subscription = Subscription.EMPTY; } else { this.observers.push(subscriber); subscription = new SubjectSubscription(this, subscriber); If (_infiniteTimeWindow) {if (_infiniteTimeWindow) {for (let I = 0; i < len && ! subscriber.closed; i++) { subscriber.next(<T>_events[i]); }} else {// Window time is not infinite, buffer stores ReplayEvent for (let I = 0; i < len && ! subscriber.closed; i++) { subscriber.next((<ReplayEvent<T>>_events[i]).value); } } if (this.hasError) { subscriber.error(this.thrownError); } else if (this.isStopped) { subscriber.complete(); } return subscription; }Copy the code
The last
In this chapter, I briefly analyzed 5 main subjects. These subjects realized different types of Muticasted Observables and extended observables.
I am limited in my ability level, if there is any mistake, welcome to point out.
Join us
We are DevUI team, welcome to come here and build elegant and efficient human-computer design/research and development system with us. Email: [email protected].
ZCX (官 码 : Coder)
The original links: [RxJS source code parsing (2) : Multicast observables] (https://mp.weixin.qq.com/s/i14brW_Ok8JYGoBIcfhs5Q)
Previous articles are recommended
Observable & Subscription (RxJS)
Dark Mode and Thematic Development of Web Interfaces
“How to build a grayscale Publishing environment”