The definition of Subject is clearly explained in the comments to the official source code
/**
* A Subject is a special type of Observable that allows values to be
* multicasted to many Observers. Subjects are like EventEmitters.
*
* Every Subject is an Observable and an Observer. You can subscribe to a
* Subject, and you can call next to feed values as well as error and complete.
*/
Copy the code
The Subject is a multicast Observable, just like EventEmitters, where each Subject is both an Observable and an Observer. So Subject has a subscribe method (inherited from Observable) as well as next, complete, and error methods (inherited from Subscription)
fromnew Subject
start
const subject = new Subject<number> (); subject.subscribe(data= > console.log('observerA: ', data));
subject.subscribe(data= > console.log('observerB: ', data));
subject.next(1);
subject.next(2);
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Copy the code
What is a Subject
// node_modules/rxjs/src/internal/Subject.ts
export class Subject<T> extends Observable<T> implements SubscriptionLike {
constructor() {
super();
}
}
Copy the code
The Subject inherits Observable and implements SubscriptionLike, confirming the statement at the beginning of the article. Therefore, the subscribe method called next is actually the subscribe of an Observable. This was already mentioned in the previous article, and it ends up calling _trySubscribe
The _trySubscribe method originally exists in an Observable, but the Subject rewrites it, simply wrapping it up with an exception handler, and still calling the _trySubscribe method of the Observable
// node_modules/rxjs/src/internal/Subject.ts
protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
this._throwIfClosed();
return super._trySubscribe(subscriber);
}
protected _throwIfClosed() {
if (this.closed) {
throw newObjectUnsubscribedError(); }}Copy the code
The _trySubscribe of an Observable calls this._subscribe, which is found in both the Observable and the Subject, so the Subject overwrites the method. So it calls Subject _subscribe
// node_modules/rxjs/src/internal/Subject.ts
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed();
this._checkFinalizedStatuses(subscriber);
return this._innerSubscribe(subscriber);
}
protected _innerSubscribe(subscriber: Subscriber<any>) {
const { hasError, isStopped, observers } = this;
return hasError || isStopped
? EMPTY_SUBSCRIPTION
: (observers.push(subscriber), new Subscription(() = > arrRemove(observers, subscriber)));
}
Copy the code
So the first two method calls in the body of the _SUBSCRIBE method are exception handling and you don’t have to worry about that, but look at the last one, the _subscribe method, In normal cases, this is done (observers. Push (subscriber), new Subscription(() => arrRemove(observers, subscriber))
This Subject is a multicastable Observable, and when something happens, it sends it to all observers. How does it know about them? Once you subscribe, store all observers in observers array
_innerSubscribe returns new Subscription(() => arrRemove(observers, subscriber)). The initialization parameter passed in new Subscription () => arrRemove(observers, subscriber), which is stored on the initialTeardown attribute of the Subscription instance, While subscription executes unsubscribe, the initialTeardown method is also executed, in this case, removing subscriber from observers for flexible management of observers
// node_modules/rxjs/src/internal/Subject.ts
next(value: T) {
errorContext(() = > {
this._throwIfClosed();
if (!this.isStopped) {
const copy = this.observers.slice();
for (const observer ofcopy) { observer.next(value); }}}); }Copy the code
The Subject also overrides the next method, which, as you can see, is indeed taken from this.observers, and executes the next method through each item
An Observable notifies an observer immediately when it subscribes, while a Subject just saves the incoming observer and exposes the Subscriber. Wait until the outside world calls the next method unsolicited to notify all observers of the man-in-the-middle design pattern
Subject overwrites not only next, but also Error, complete, and unsubscribe, all in order to manage observers queues, Observers end up calling error and complete for each item in observers
error(err: any) {
// ...
const { observers } = this;
while(observers.length) { observers.shift()! .error(err); }// ...
}
complete() {
// ...
const { observers } = this;
while(observers.length) { observers.shift()! .complete(); }// ...
}
unsubscribe() {
this.isStopped = this.closed = true;
this.observers = null! ; }Copy the code
There is another important method in Subject: AsObservable, the name of this method is semantic. Without looking at the logic, you can guess that it should be able to use the subject as an Observable. However, the subject laboriously adds a lot of logic that looks good on the basis of the Observable. An Observable that implements multicast, why go back?
/**
* Creates a new Observable with this Subject as the source. You can do this
* to create customize Observer-side logic of the Subject and conceal it from
* code that uses the Observable.
* @return {Observable} Observable that the Subject casts to
*/
asObservable(): Observable<T> {
const observable: any = new Observable<T>();
observable.source = this;
return observable;
}
Copy the code
The comments for the asObservable method explain that using the Subject instance as the data source for the Observable allows us to create a custom observer logic and hide some code details
What code details are hidden? Subject used to be both an Observable and an Observer. Now it becomes an Observable directly, which is equivalent to abandoning its status as an Observer. There are three important methods in Observer: Next, complete, and error are all methods used to manipulate data streams, which means they are hidden from the public. Why?
At the beginning of the article, it is mentioned that Subject is both an Observable and an Observer, which seems to be more capable, but it also means greater risks
Essentially, you just want to create a multicast Observable, and you want an Observable to only subscribe and unsubscribe. The outside world can only read data, and the internal data generation and flow should be hidden from the outside world. If the outside world can call next, Error, complete, and other methods that manipulate the data flow, it is likely to disrupt the logic of the data flow you intended
function intervalOut() {
const source$ = new Subject<number> ()let i = 0
setInterval(() = > {
source$.next(i++)
}, 1000)
return source$
}
const instance = intervalOut()
instance.subscribe(data= > console.log('Subscribe A:', data))
instance.subscribe(data= > console.log('Subscribe B:', data))
setInterval(() = > {
instance.next(9999999)},900)
Copy the code
For the example above, you just want to notify all subscribers every 1000ms, but since instance is provided with a next method, outsiders can call next to disrupt the flow of data, and since instance is multicast, So calling the next, Error, and complete methods anywhere affects all observers
function intervalOut() {
const source$ = new Subject<number> ()let i = 0
setInterval(() = > {
source$.next(i++)
}, 1000)
return source$.asObservable()
}
const instance = intervalOut()
instance.subscribe(data= > console.log('Subscribe:', data))
setInterval(() = > {
instance.next(9999999) // Error: Property 'next' does not exist on type 'Observable<number>'.
}, 900)
Copy the code
Change the intervalOut to return source$.asObservable(), so you can still subscribe, but can’t call next, error, or complete
However, sometimes I want the outside world to participate in the call process of these three methods, because I may need to change the internal data according to the state of the outside world, so I can let the outside world indirectly call these three methods by wrapping a layer, that is, adding customize observer-side logic. The wrapper methods are exposed and the custom logic is executed before being called
function buildTeam() {
const source$ = new Subject<string> ()return {
observable: source$.asObservable(),
broadcast: (level: number) = > {
if (level > 90) {
source$.next('Welcome to the club.')}else if (level < 60) {
source$.next('Here comes a vegetable chicken.')}else {
source$.next('There you are.')}}}}const instance = buildTeam()
instance.observable.subscribe(data= > console.log(data))
instance.broadcast(30)
instance.broadcast(70)
instance.broadcast(100)
// Here comes a dish chicken
/ / you come
// Welcome to join us
Copy the code
The outside world can call next indirectly through broadcast, but how to call next is controlled by broadcast itself, not by the outside world
Cold Observable & Hot Observable
In RXJS, the basic Observable is called Cold Observable, while the Subject inherited from an Observable is called Hot Observable
const sourceA = new Observable<number> (subscribe= > {
subscribe.next(Math.random())
})
sourceA.subscribe(data= > console.log('A:', data))
sourceA.subscribe(data= > console.log('B:', data))
// A: 0
// B: 1
Copy the code
The sourceA subscribes twice, and each time the data is inconsistent, that is, each subscribe executes the logic in the callback function. As a Cold Observable, all the subscribers have different data sources. Observable and observer have a one-to-one relationship
let index = 0
const sourceB = new Subject()
sourceB.subscribe(data= > console.log('A:', data))
sourceB.subscribe(data= > console.log('B:', data))
sourceB.next(index++)
// A: 0
// B: 0
Copy the code
SourceB subscribes twice, but both subscribers receive the same data. The logic is executed once and sent to all subscribers. In the case of a Cold Observable, all subscribers have the same data source. Observable and Observer have a one-to-many relationship
ConnectableObservable
In the last article, I talked about An Observable, but I didn’t finish it. Because some parts involve the Subject of this article, so I put it here. There is a variation of An Observable called ConnectableObservable, but this method has been deprecated. The official recommendation is to use connectable instead
// node_modules/rxjs/src/internal/observable/connectable.ts
// Creates an observable that multicasts once `connect()` is called on it.
Copy the code
Connectable also creates an Observable, a pure Observable, that just calls the Subscribe method and understands the callback and sends the data, An Observable that subscribes with a Connectable receives no response, but broadcasts events only after calling the connect() method on the instance
This separates subscribe from broadcast events, allowing subscribe to be broadcast at any time, but only when connect is called, increasing flexibility
const conn1 = connectable(new Observable<number> (subscribe= > subscribe.next(Math.random())))
conn1.subscribe(data= > console.log('A:', data))
conn1.subscribe(data= > console.log('B:', data))
// Connect must be called
conn1.connect()
/ / A: 0.5575458558084838
/ / B: 0.5575458558084838
Copy the code
Connectable is a method that takes a mandatory parameter source and an optional parameter config,
// node_modules/rxjs/src/internal/observable/connectable.ts
export function connectable<T> (source: ObservableInput<T>, config: ConnectableConfig<T> = DEFAULT_CONFIG) :Connectable<T> {
// ...
const { connector, resetOnDisconnect = true } = config;
let subject = connector();
const result: any = new Observable<T>((subscriber) = > {
return subject.subscribe(subscriber);
});
// ...
}
Copy the code
First, we define a subject variable that is generated by connector, a property of the function’s second config argument object. The default value is () => new Subject
(), That is, the default value of suject is a Subject instance
export interface ConnectableConfig<T> {
connector: () = >SubjectLike<T>; resetOnDisconnect? :boolean;
}
const DEFAULT_CONFIG: ConnectableConfig<unknown> = {
connector: () = > new Subject<unknown>(),
resetOnDisconnect: true};Copy the code
We then define a result variable, an instance of Observable, that calls subject.subscribe in its callback method
From the previous analysis, an Observable calling SUBSCRIBE executes the initial callback immediately, as long as it calls result.subscribe. Then subject.subscribe is executed
// node_modules/rxjs/src/internal/observable/connectable.ts
export function connectable<T> (source: ObservableInput<T>, config: ConnectableConfig<T> = DEFAULT_CONFIG) :Connectable<T> {
// ...
result.connect = () = > {
if(! connection || connection.closed) { connection = defer(() = > source).subscribe(subject);
if (resetOnDisconnect) {
connection.add(() = >(subject = connector())); }}return connection;
};
return result;
}
Copy the code
Then, a connect method is inserted on the Result Observable instance. The important sentence in the connect is connection = defer(() => source).subscribe(subject);
Defer is an operator, equivalent to source.subscribe(subject), which is the first Observable passed in to the connectable method. Subscribe => subscribe. Next (math.random ())) Next (math.random ()) = next(math.random ())
Since the subject is also subscribed to observers via result.subscribe, they are concurrently executed, in this case, Data => console.log(‘A:’, data) and data => console.log(‘B:’, data) are executed in sequence
Therefore, Subject once again plays the role of middleman and completes its multicast task
Additionally, defer(() => source).subscribe(subject); There’s actually an if condition inside of that statement
if(! connection || connection.closed) {// ...
}
Copy the code
When the conditional statement is first called to.connect, there is no doubt! Connection === true, so we can go to the logic and look at the value of connection.closed when we call.connect again from the outside
Connection. closed here is the subject wrapped by SafeSubscriber, As long as subject.plete or subject.unsubscribe or subject.error is called, closed will be set to true
const conn1 = connectable(new Observable<number> (subscribe= > {
subscribe.next(1)
}))
conn1.subscribe(data= > console.log('A:', data))
conn1.connect()
conn1.subscribe(data= > console.log('B:', data))
conn1.connect()
// A: 1
Copy the code
In this case, since connection.closed === false, the second call to conn1.connect() does not broadcast any events, but this is fine if you change it to the following
const conn1 = connectable(new Observable<number> (subscribe= > {
subscribe.next(1)
/ / this sentence
subscribe.complete()
}))
conn1.subscribe(data= > console.log('A:', data))
conn1.connect()
conn1.subscribe(data= > console.log('B:', data))
conn1.connect()
// A: 1
// B: 1
Copy the code
This is supposed to protect a single data stream and not allow the next one to start until the last one has finished
summary
It is a gradual process from Observable to Subject. Due to the limitations of an Observable itself in unicast, a multicast Subject emerges. Similarly, Subject also has its own limitations. RXJS also encapsulates several variations of Subject