The original link: blog.angularindepth.com/rxjs-unders…
This article is translated by RxJS Chinese community, if you need to reprint, please indicate the source, thank you for your cooperation!
If you would like to translate more quality RxJS articles with us, please click here.
Photo by Unsplash by Kimberly Farmer.
I am often asked about the publish operator:
What is the difference between Publish and share?
How do I import the refCount operator?
When to use AsyncSubject?
We’ll answer those questions and let you know more, starting with the basics.
The mental model of multicast
Multicast is a term used to describe the situation where each notification emitted by a single Observable is received by multiple observers. An Observable’s ability to multicast depends on whether it’s hot or cold.
Hot and cold Observables are characterized by where the producers of observable notifications are created. In Ben Lesh’s hot Vs Cold Observables, he discusses in detail the differences between the two, which can be summarized as follows:
- An Observable is cold if the notification producer is created when the observer subscribes to it. A Timer Observable, for example, is cold and creates a new timer every time it subscribes.
- An Observable is hot if the notification producer is not created each time an observer subscribes to it. An Observable created using fromEvent, for example, is hot. The event-generating element is in the DOM, not created when the observer subscribes.
The cold observables are unicast, and each observer receives notifications from a different producer that was created when the observer subscribed.
Hot observables are multicast; each observer receives notifications from the same producer.
In some cases, cold Observables need to have multicast behavior, and RxJS introduces the Subject class to make this possible.
Subject is an Observable and an observer. Cold Observables can be made hot by subscribing to the subject with an observer, which in turn subscribes to the cold Observable. This is the main use of RxJS for introducing Subjects. In Ben Lesh’s article on Subject in RxJS, he points out:
Multicast is the main use of Subjects in RxJS.
Let’s look at the following example:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
const source = Observable.defer((a)= > Observable.of(
Math.floor(Math.random() * 100)));function observer(name: string) {
return {
next: (value: number) = > console.log(`observer ${name}: ${value}`),
complete: (a)= > console.log(`observer ${name}: complete`)}; }const subject = new Subject<number> (); subject.subscribe(observer("a"));
subject.subscribe(observer("b"));
source.subscribe(subject);
Copy the code
The source in the example is cold. Each time the observer subscribes to the Source, the factory function passed to defer creates an Observable that emits a random number.
To make the source multicast, the observer needs to subscribe to the subject, which in turn subscribes to the source. The source sees only one subscription, which also generates only a next notification with a random number and a complete notification. The Subject sends these notifications to its observer, with output like this:
observer a: 42
observer b: 42
observer a: complete
observer b: complete
Copy the code
This example serves as the basic mental model for RxJS multicast: a source Observable, a subject of a subscribing Observable, and observers of multiple subscribing subjects.
Multicast operator and ConnectableObservable
RxJS introduces the multicast operator, which can be applied to observables to make them hot. This operator encapsulates the infrastructure that Subject uses as a multicast Observable.
Before we look at the multicast operators, we use a simple implementation of the multicast function to replace subject in the above example:
function multicast<T> (source: Observable<T>) {
const subject = new Subject<T>();
source.subscribe(subject);
return subject;
}
const m = multicast(source);
m.subscribe(observer("a"));
m.subscribe(observer("b"));
Copy the code
With the code changed, the output of the example looks like this:
observer a: complete
observer b: complete
Copy the code
That’s not what we want. Subscribing to a Subject within a function causes the subject to receive next and complete notifications before being subscribed by the observer, so the observer can only receive complete notifications.
This is avoidable, and the caller of any function that connects to the multicast infrastructure needs to be able to control when the Subject subscribes to the source Observable. The MULTICAST operator of RxJS is implemented by returning a special Observable type, ConnectableObservable.
The ConnectableObservable encapsulates the multicast infrastructure, but it does not subscribe to the source Observable immediately, only when its Connect method is called.
Let’s use the multicast operator:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/multicast";
const source = Observable.defer((a)= > Observable.of(
Math.floor(Math.random() * 100)));function observer(name: string) {
return {
next: (value: number) = > console.log(`observer ${name}: ${value}`),
complete: (a)= > console.log(`observer ${name}: complete`)}; }const m = source.multicast(new Subject<number> ()); m.subscribe(observer("a"));
m.subscribe(observer("b"));
m.connect();
Copy the code
After the code changes, the observer now receives a next notification:
observer a: 54
observer b: 54
observer a: complete
observer b: complete
Copy the code
When connect is called, the Subject passing in the multicast operator subscribes to the source Observable, and the subject’s observer receives a multicast notification, which conforms to the basic mental model of RxJS multicast.
The ConnectableObservable also has another method, refCount, that determines when the source Observable generated a subscription.
RefCount looks like an operator, that is, it’s a method called on an Observable and returns another Observable, but it’s just a ConnectableObservable method and doesn’t need to be imported. As the name implies, refCount returns an Observable that maintains a reference count of generated subscriptions.
The reference count increases when the observer subscribes to the Observable responsible for the reference count, and if the previous reference count is 0, the subject responsible for the multicast infrastructure subscribes to the source Observable. When an observer unsubscribes, the reference count decreases, and if the reference count goes to zero, the Subject unsubscribes to the source Observable.
Let’s use refCount:
const m = source.multicast(new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
Copy the code
After the code changes, the output looks like this:
observer a: 42
observer a: complete
observer b: complete
Copy the code
Only the first observer received the next notification. Let’s see why.
The source Observable in the example immediately sends a notification. That is, once subscribed, the source Observable issues next and complete notifications that cause the first observer to unsubscribe before the second observer can subscribe. The reference count goes to zero when the first one unsubscribes, so the subject responsible for the multicast infrastructure unsubscribes the source Observable as well.
When the second observer subscribes, the subject subscribes to the source Observable again, but since the subject has already received the complete notification, it cannot be reused.
The factory function passed to The subject for multicast solves this problem:
const m = source.multicast((a)= > new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
Copy the code
After the code changes, a new subject is created each time the source Observable is subscribed, and the output looks like this:
observer a: 42
observer a: complete
observer b: 54
observer b: complete
Copy the code
Because the source Observable sends notifications immediately, the observer receives notifications separately. Modify source to delay notification:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/multicast";
const source = Observable.defer((a)= > Observable.of(
Math.floor(Math.random() * 100)
)).delay(0);
Copy the code
The observer will still receive a multicast notification, with output like this:
observer a: 42
observer b: 42
observer a: complete
observer b: complete
Copy the code
To summarize, the above example shows the following characteristics of the multicast operator:
- Encapsulates the multicast infrastructure to conform to the multicast mental model;
- provides
connect
Method to determine when the source Observable generated a subscription; - provides
refCount
Method to automatically manage subscriptions to the source Observable. - If you are using
refCount
, must be passedSubject
The factory function ofSubject
Instance;
Next, let’s look at the Publish and Share operators, as well as the publish variants, and see how they build on what the multicast operator provides.
The publish operator
Let’s look at the publish operator using the following example:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/publish";
function random() {
return Math.floor(Math.random() * 100);
}
const source = Observable.concat(
Observable.defer((a)= > Observable.of(random())),
Observable.defer((a)= > Observable.of(random())).delay(1));function observer(name: string) {
return {
next: (value: number) = > console.log(`observer ${name}: ${value}`),
complete: (a)= > console.log(`observer ${name}: complete`)}; }const p = source.publish();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code
The source Observable in the example immediately emits a random number, after a short delay emits another random number, and completes. This example shows what happens when the subscriber subscribes before connect, after connect, and after the Observable that called publish completes.
publish
The operator is correctmulticast
Operators are wrapped in a thin layer. It will be calledmulticast
And the incomingSubject
。
The output of the example looks like this:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
Copy the code
The notice received by the observer can be summarized as follows:
a
Is in theconnect
Subscribed before calling, so it can receive twonext
Notification andcomplete
Notice.b
Is in theconnect
Subscribed after the call, the first to be sent immediatelynext
The notification has already been sent, so it can only receive the second onenext
Notification andcomplete
Notice.c
Subscribe after the source Observable completes, so it only receivescomplete
Notice.
Use refCount instead of connect:
const p = source.publish().refCount();
p.subscribe(observer("a"));
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code
The output of the example looks like this:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
Copy the code
The output is similar to that with CONNECT. Why is that?
B doesn’t receive the first Next notification because the first Next notification is sent immediately by the source Observable, so only A can receive it.
C subscribes after a publish Observable is called, so the reference count of the subscribe is 0, and a subscribe is regenerated. However, publish is passed to multicast by Subject, not factory function, because SUBJECTS cannot be reused, so C can only receive complete notification.
Publish and multicast operators both accept an optional selector function, and the operators behave quite differently if this function is specified. This will be covered in detail in another article, Secrets of the Multicast operator.
Special types of subjects
There are several variants of the publish operator, all of which wrap Multicast in a similar way, passing in subjects rather than factory functions. However, they introduce different types of subjects.
The special types of subjects used by the publish variant include:
BehaviorSubject
ReplaySubject
AsyncSubject
The answer to how to use these special types of subjects is that each variant is associated with a special type of Subject, which you use when you want behavior similar to that of a publish variant. Let’s see how these variants behave.
PublishBehavior operator
The publishBehavior was passed to the Multicast BehaviorSubject, not Subject. The BehaviorSubject is similar to the Subject, but if the Subject’s subscription occurs before the source Observable issues a Next notification, the Subject issues a Next notification with an initial value.
Let’s change the example to give a short delay to the source Observable that generates random numbers so that it doesn’t immediately emit random numbers:
const delayed = Observable.timer(1).switchMapTo(source);
const p = delayed.publishBehavior(- 1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code
The output of the example looks like this:
observer a: -1
observer b: -1
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
Copy the code
The notice received by the observer can be summarized as follows:
a
Is in theconnect
Subscribed before the call, so it can receive an initial value with subjectnext
Notification and source Observablenext
Notification andcomplete
Notice.b
Is in theconnect
After the call, the subject receives the first observable from the sourcenext
Subscribed before notification, so it can receive an initial value of subjectnext
Notification and source Observablenext
Notification andcomplete
Notice.c
Subscribe after the source Observable completes, so it only receivescomplete
Notice.
PublishReplay operator
PublishReplay passes ReplaySubject to multicast, not Subject. As the name implies, ReplaySubject replays a specified number of Next notifications each time an observer subscribes.
const p = source.publishReplay(1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code
Using publishReplay, the output of the example looks like this:
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
Copy the code
The notice received by the observer can be summarized as follows:
a
Is in theconnect
Subscribed before the call, which the Subject has not yet receivednext
Notice, soa
Two observables that receive the sourcenext
Notification andcomplete
Notice.b
Is in theconnect
The subject has received the first observable from the sourcenext
Notice, sob
You can get a replaynext
Notification, the second of the source Observablenext
Notification andcomplete
Notice.c
Subscribes after the source Observable completes, so it receives the replaynext
Notification andcomplete
Notice.
Looking at the behavior of C, it is clear that, unlike the publish operator, the publishReplay operator is suitable for using the refCount method because observers can receive any number of next notifications for replays even after the source Observable has subscribed.
PublishLast operator
PublishLast is passed to multicast AsyncSubject, not Subject. AsyncSubject is the most special special type of subjects. Only when it completes does it issue a Next notification (if any) and a complete notification, which is the last Next notification in the source Observable.
const p = source.publishLast();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code
Using publishLast, the output from the example looks like this:
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
Copy the code
The notice received by the observer can be summarized as follows:
a
和b
They subscribe before the source Observable completes, but they don’t receive notifications until the source Observable completes, and they receive one with a second random numbernext
Notification andcomplete
Notice.c
Subscribe after the source Observable completes, and it receives one with a second random numbernext
Notification andcomplete
Notice.
Like publishReplay, the publishLast operator works well with the refCount method because observers can receive any number of next notifications for replays even after the source Observable has subscribed.
Share the operator
The share operator is similar to using publish().refcount (). However, share passes to Multicast as a factory function, which means that a new Subject is created to subscribe to the source Observable if the reference count is 0.
const s = source.share();
s.subscribe(observer("a"));
s.subscribe(observer("b"));
setTimeout((a)= > s.subscribe(observer("c")), 10);
Copy the code
Using share, the sample output looks like this:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 6
observer c: 9
observer c: complete
Copy the code
The notice received by the observer can be summarized as follows:
a
Subscribe and receive the first one immediatelynext
Notice, followed by a secondnext
Notification andcomplete
Notice.b
Can only receive the secondnext
Notification andcomplete
Notice.c
Subscribe after the source Observable completes, a new subject is created to subscribe to the source Observable, which immediately receives the firstnext
Notice, followed by a secondnext
Notification andcomplete
Notice.
In the examples above, we introduced the Publish and share operators, which automatically unsubscribe a and B when the source Observable completes. They also automatically unsubscribe if the source Observable reports an error. The publish and share operators differ in another way:
- If the source Observable reports an error, call the
publish
Any future subscribers of the returned Observable will receive iterror
Notice. - However, by
share
Any future subscribers of the returned Observable generate a new subscription to the source Observable, because errors automatically cancel any subscriber’s subscription, returning its reference count to zero.
That’s it. That’s the end of this article. We introduced six operators, but they are all implemented in a similar way, and they all conform to the same basic mental model: a source Observable, a subject of a subscribing Observable, and observers of multiple subscribing subjects.
This article has only briefly covered the refCount method. For an in-depth look, see RxJS: How to Use refCount.