What is the Subject
As for the definition of Subject, the official documentation is quite clear and unambiguous:
What is a Subject? An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
In other words, you can think of the Subject as a multicast Observable, and the source code is clear about the inheritance relationship between the Subject and Observable:
class Subject<T> extends Observable<T> implements SubscriptionLike {}
Copy the code
Therefore, the focus of this study falls on two aspects: first, how to design the ability of Subject multicast; Second, the difference between Subject and Observable with the same name. In some cases, the two problems are essentially one and the same. Let’s start with a simple example as well:
import { Subject } from 'rxjs';
const subject = new Subject<number> ();const observerA = {
next: (v) = > console.log(`observerA: ${v}`)};const observerB = {
next: (v) = > console.log(`observerB: ${v}`)}; subject.subscribe(observerA); subject.subscribe(observerB); subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Copy the code
First, we create a data flow object subject with an internal data type of number. When subject calls the subscribe function, it actually executes observable. subscribe. At the same time, the object observerA is not a Subscriber object instance, the code will create a SafeSubscriber instance, and get:
this.destination = {
next: (v) = > console.log(`observerA: ${v}`),
error: (err) = > throw err, // defaultErrorHandler
complete: () = > {}, // noop
}
Copy the code
In the example above, we did not assign values to the internal attributes source and Operators of the Subject object, so the subscribe method actually executes the subjy._trysubscribe method, This is essentially the _trySubscribe method of an Observable:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/ * *... * /
protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
this._throwIfClosed();
return super._trySubscribe(subscriber);
}
protected _throwIfClosed() {
if (this.closed) {
throw newObjectUnsubscribedError(); }}/ * *... * /
}
Copy the code
Remember that for the variable Subject object, the this inside of it points to the Subject object, so its subscription function ends up doing:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/ * *... * /
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed();
this._checkFinalizedStatuses(subscriber);
return this._innerSubscribe(subscriber);
}
protected _innerSubscribe(subscriber: Subscriber<any>) {
const { hasError, isStopped, observers } = this;
return hasError || isStopped
? EMPTY_SUBSCRIPTION
: (observers.push(subscriber), new Subscription(() = > arrRemove(observers, subscriber)));
}
protected _checkFinalizedStatuses(subscriber: Subscriber<any>) {
const { hasError, thrownError, isStopped } = this;
if (hasError) {
subscriber.error(thrownError);
} else if(isStopped) { subscriber.complete(); }}/ * *... * /
}
Copy the code
In the example above, when the Subject executes the subscribe() method, the observer passed in is appended to the end of the Obervers array, and an initialTeardown is created as a Subscription object for the following function, When they are unsubscribed, this function is executed to remove the observer from observers:
initialTeardown: () = > arrRemove(observers, subscriber));
Copy the code
Therefore, the basis of the Subject’s ability to replicate Observable multicast is the management of its observers attributes. Next, let’s see how the Subject data stream is multicast. As the name implies, multicast means that the data source pushes data to multiple Subscriber/observers, so we only need to know the next method of a wave of Subject objects:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/ * *... * /
next(value: T) {
this._throwIfClosed();
if (!this.isStopped) {
const copy = this.observers.slice();
for (const observer ofcopy) { observer.next(value); }}}error(err: any) {
this._throwIfClosed();
if (!this.isStopped) {
this.hasError = this.isStopped = true;
this.thrownError = err;
const { observers } = this;
while(observers.length) { observers.shift()! .error(err); }}}complete() {
this._throwIfClosed();
if (!this.isStopped) {
this.isStopped = true;
const { observers } = this;
while(observers.length) { observers.shift()! .complete(); }}}/ * *... * /
}
Copy the code
From the code, the nature of the Subject object’s multicast capability is a recursive call to Observer.next, and there is nothing mysterious about it. Finally, let’s look at the different methods that Subject and Observable use, especially the unsubscribe() method:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/ * *... * /
unsubscribe(): void {
this.isStopped = this.closed = true;
this.observers = null! ; }/ * *... * /
}
Copy the code
The purpose of the sujection. Unsubscribe method is to cleanse themselves of Observers; Interestingly, we learned earlier about the confusingly named Subscription. Unsubscribe method, which unsubscribes the Subscriber/Observer itself to the Observable/Subject.
Finally, let’s look at the frequently used subject-asObservable method:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/ * *... * /
asObservable(): Observable<T> {
const observable: any = new Observable<T>();
observable.source = this;
return observable;
}
/ * *... * /
}
Copy the code
When learning Observable, we found that its source attribute always seems to be undefined, which makes it hard to understand its function. But it’s essential to the Subject.asObservable: The asObservable method creates a new Observable instance and sets its source attribute to the Subject itself. In other words, the Subject object replaces Subscriber as the data source of the Observable. The data pushed by the Subject object can be obtained by the Observer subscription of the Observable. An ordinary Observable calls the Subscribe function, which essentially adds a Subscriber. An Observable created based on the subject. asObservable method calls the Subscribe function, which essentially adds an Observer to the Subject. Next, let’s look at the variants of the Subject.
What is BehaviorSubject
A BehaviorSubject object, relative to a Subject object, always saves the latest data pushed by the data stream. Any new subscriber always receives the latest value. Look directly at the example:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) = > console.log(`observerA: ${v}`)}); subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) = > console.log(`observerB: ${v}`)}); subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
Copy the code
When creating a new Instance of the BehaviorSubject object, we need to pass in the initial value parameter, which represents the latest value of the current flow. The source code is also very concise and allows us to get the latest value directly:
class BehaviorSubject<T> extends Subject<T> {
constructor(private _value: T) {
super(a); }get value() :T {
return this.getValue();
}
getValue(): T {
const { hasError, thrownError, _value } = this;
if (hasError) {
throw thrownError;
}
this._throwIfClosed();
return _value;
}
/ * *... * /
}
Copy the code
Based on the grammatical rules of TypeScript, a private parameter is passed directly into an construtor(), which is equivalent to an automatic declaration that an object has an attribute of the same name. A BehaviorSubject object should get the latest value at every subscription. Therefore, the first subscription requires a BehaviorSubject object with an initial value:
class BehaviorSubject<T> extends Subject<T> {
/ * *... * /
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber); ! subscription.closed && subscriber.next(this._value);
return subscription;
}
next(value: T): void {
super.next((this._value = value)); }}Copy the code
The subscription behavior of the BehaviorSubject object is basically the same as that of the Subject subscription method, except that the latest value is provided directly. Therefore, if the data flow is not finished, subscriber.next(‘ this’._value) is required. At the same time, when pushing new data, the latest value needs to be updated.
What is AsyncSubject
For Observers, AsyncSubject is similar to BahaviorSubject in getting the latest values from them; In contrast, subscribes to AsyncSubject objects that only get the most recent value when the data stream ends. Such as:
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) = > console.log(`observerA: ${v}`)}); subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) = > console.log(`observerB: ${v}`)}); subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
Copy the code
As you can imagine, AsyncSubject does not immediately push a new value when executing the next method, but updates the current value; When the complete method is executed, the value is pushed along with the latest value. The source code for RxJS logic is also easy to understand:
class AsyncSubject<T> extends Subject<T> {
private _value: T | null = null;
private _hasValue = false;
private _isComplete = false;
/ * *@internal * /
protected _checkFinalizedStatuses(subscriber: Subscriber<T>) {
const { hasError, _hasValue, _value, thrownError, isStopped } = this;
if (hasError) {
subscriber.error(thrownError);
} else if(isStopped) { _hasValue && subscriber.next(_value!) ; subscriber.complete(); } } next(value: T):void {
if (!this.isStopped) {
this._value = value;
this._hasValue = true;
}
}
complete(): void {
const { _hasValue, _value, _isComplete } = this;
if(! _isComplete) {this._isComplete = true;
_hasValue && super.next(_value!) ;super.complete(); }}}Copy the code
ReplaySubject
The key to the ReplaySubject object is the “REPLAY”, as described in the official documentation:
A
ReplaySubject
records multiple values from the Observable execution and replays them to new subscribers.
That is, when ReplaySubejct has a new subscriber, multiple stored values are pushed back to the new subscriber, depending on the size of the quantity window or the size of the time window. Such as:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) = > console.log(`observerA: ${v}`)}); subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) = > console.log(`observerB: ${v}`)}); subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
Copy the code
In the example above, the ReplaySubject object instance has a quantity window size of 3, so the new subscriber Observer B receives three old values as soon as it subscribes. In addition, ReplaySubject also allows you to set time Windows on top of quantity Windows, such as:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100.500 /* windowTime */);
subject.subscribe({
next: (v) = > console.log(`observerA: ${v}`)});let i = 1;
setInterval(() = > subject.next(i++), 200);
setTimeout(() = > {
subject.subscribe({
next: (v) = > console.log(`observerB: ${v}`)}); },1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
Copy the code
As shown above, we set the number window for ReplaySubject object instances to 100 and the time window to 500ms. By the time a new subscriber subscribes to this object, 1000ms has passed, so a maximum of 100 values within 500ms need to be pushed to the new subscriber, thus obtaining three values. Next, let’s look at the source code implementation of ReplaySubject.
class ReplaySubject<T> extends Subject<T> {
private _buffer: (T | number=) [] [];private _infiniteTimeWindow = true;
constructor(
private _bufferSize = Infinity.private _windowTime = Infinity.private _timestampProvider: TimestampProvider = dateTimestampProvider
) {
super(a);this._infiniteTimeWindow = _windowTime === Infinity;
this._bufferSize = Math.max(1, _bufferSize);
this._windowTime = Math.max(1, _windowTime);
}
/ * *... * /
}
Copy the code
Obviously, the ReplaySubject object allows you to pass in three parameters, representing a quantity window and a time window, which default to an infinite value.
class ReplaySubject<T> extends Subject<T> {
/ * *... * /
next(value: T): void {
const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
if(! isStopped) { _buffer.push(value); ! _infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime); }this._trimBuffer();
super.next(value);
}
/ * *... * /
}
Copy the code
The ReplaySubject object performs the next method. In addition to super.next(value), the _buffer property needs to be prune before the super.next(value) property is appended with a new value and the time is recorded:
class ReplaySubject<T> extends Subject<T> {
/ * *... * /
private _trimBuffer() {
const { _bufferSize, _timestampProvider, _buffer, _infiniteTimeWindow } = this;
// If we don't have an infinite buffer size, and we're over the length,
// use splice to truncate the old buffer values off. Note that we have to
// double the size for instances where we're not using an infinite time window
// because we're storing the values and the timestamps in the same array.
const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
_bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);
// Now, if we're not in an infinite time window, remove all values where the time is
// older than what is allowed.
if(! _infiniteTimeWindow) {const now = _timestampProvider.now();
let last = 0;
// Search the array for the first timestamp that isn't expired and
// truncate the buffer up to that point.
for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
last = i;
}
last && _buffer.splice(0, last + 1); }}/ * *... * /
}
Copy the code
The pruning strategy is simple: first compute the current BufferSize and delete it from the original _buffer until it is equal to BufferSize. Next, if there is a time window, it is necessary to clear the values outside the time window according to the current time and time window. It should be noted that every odd bit element in _buffer is a value in the data stream. The value of each even digit is the timestamp with the time window.
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed();
this._trimBuffer();
const subscription = this._innerSubscribe(subscriber);
const { _infiniteTimeWindow, _buffer } = this;
// We use a copy here, so reentrant code does not mutate our array while we're
// emitting it to a new subscriber.
const copy = _buffer.slice();
for (let i = 0; i < copy.length && ! subscriber.closed; i += _infiniteTimeWindow ?1 : 2) {
subscriber.next(copy[i] as T);
}
this._checkFinalizedStatuses(subscriber);
return subscription;
}
Copy the code
When a ReplaySubject adds a new subscriber, it inherits the Subject’s _innerSubscribe method and pushes a copy of the saved old data to the new subscriber.
The next step
After learning the core RxJS trio of Observable, Subject, and Subscription, you’ll find the Most dazzling Operators module. The presence of Pipe () puts Operators to great use and enables RxJS to handle complex scenarios. Next, we will spend some time learning Operators.