The original link: blog.angularindepth.com/rxjs-unders…

This article is translated by RxJS Chinese community, if you need to reprint, please indicate the source, thank you for your cooperation!

If you would like to translate more quality RxJS articles with us, please click here.

Photo by Unsplash by Kimberly Farmer.

I am often asked about the publish operator:

What is the difference between Publish and share?

How do I import the refCount operator?

When to use AsyncSubject?

We’ll answer those questions and let you know more, starting with the basics.

The mental model of multicast

Multicast is a term used to describe the situation where each notification emitted by a single Observable is received by multiple observers. An Observable’s ability to multicast depends on whether it’s hot or cold.

Hot and cold Observables are characterized by where the producers of observable notifications are created. In Ben Lesh’s hot Vs Cold Observables, he discusses in detail the differences between the two, which can be summarized as follows:

  • An Observable is cold if the notification producer is created when the observer subscribes to it. A Timer Observable, for example, is cold and creates a new timer every time it subscribes.
  • An Observable is hot if the notification producer is not created each time an observer subscribes to it. An Observable created using fromEvent, for example, is hot. The event-generating element is in the DOM, not created when the observer subscribes.

The cold observables are unicast, and each observer receives notifications from a different producer that was created when the observer subscribed.

Hot observables are multicast; each observer receives notifications from the same producer.

In some cases, cold Observables need to have multicast behavior, and RxJS introduces the Subject class to make this possible.

Subject is an Observable and an observer. Cold Observables can be made hot by subscribing to the subject with an observer, which in turn subscribes to the cold Observable. This is the main use of RxJS for introducing Subjects. In Ben Lesh’s article on Subject in RxJS, he points out:

Multicast is the main use of Subjects in RxJS.

Let’s look at the following example:

import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";

const source = Observable.defer((a)= > Observable.of(
  Math.floor(Math.random() * 100)));function observer(name: string) {
  return {
    next: (value: number) = > console.log(`observer ${name}: ${value}`),
    complete: (a)= > console.log(`observer ${name}: complete`)}; }const subject = new Subject<number> (); subject.subscribe(observer("a"));
subject.subscribe(observer("b"));
source.subscribe(subject);
Copy the code

The source in the example is cold. Each time the observer subscribes to the Source, the factory function passed to defer creates an Observable that emits a random number.

To make the source multicast, the observer needs to subscribe to the subject, which in turn subscribes to the source. The source sees only one subscription, which also generates only a next notification with a random number and a complete notification. The Subject sends these notifications to its observer, with output like this:

observer a: 42
observer b: 42
observer a: complete
observer b: complete
Copy the code

This example serves as the basic mental model for RxJS multicast: a source Observable, a subject of a subscribing Observable, and observers of multiple subscribing subjects.

Multicast operator and ConnectableObservable

RxJS introduces the multicast operator, which can be applied to observables to make them hot. This operator encapsulates the infrastructure that Subject uses as a multicast Observable.

Before we look at the multicast operators, we use a simple implementation of the multicast function to replace subject in the above example:

function multicast<T> (source: Observable<T>) {
  const subject = new Subject<T>();
  source.subscribe(subject);
  return subject;
}

const m = multicast(source);
m.subscribe(observer("a"));
m.subscribe(observer("b"));
Copy the code

With the code changed, the output of the example looks like this:

observer a: complete
observer b: complete
Copy the code

That’s not what we want. Subscribing to a Subject within a function causes the subject to receive next and complete notifications before being subscribed by the observer, so the observer can only receive complete notifications.

This is avoidable, and the caller of any function that connects to the multicast infrastructure needs to be able to control when the Subject subscribes to the source Observable. The MULTICAST operator of RxJS is implemented by returning a special Observable type, ConnectableObservable.

The ConnectableObservable encapsulates the multicast infrastructure, but it does not subscribe to the source Observable immediately, only when its Connect method is called.

Let’s use the multicast operator:

import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/multicast";

const source = Observable.defer((a)= > Observable.of(
  Math.floor(Math.random() * 100)));function observer(name: string) {
  return {
    next: (value: number) = > console.log(`observer ${name}: ${value}`),
    complete: (a)= > console.log(`observer ${name}: complete`)}; }const m = source.multicast(new Subject<number> ()); m.subscribe(observer("a"));
m.subscribe(observer("b"));
m.connect();
Copy the code

After the code changes, the observer now receives a next notification:

observer a: 54
observer b: 54
observer a: complete
observer b: complete
Copy the code

When connect is called, the Subject passing in the multicast operator subscribes to the source Observable, and the subject’s observer receives a multicast notification, which conforms to the basic mental model of RxJS multicast.

The ConnectableObservable also has another method, refCount, that determines when the source Observable generated a subscription.

RefCount looks like an operator, that is, it’s a method called on an Observable and returns another Observable, but it’s just a ConnectableObservable method and doesn’t need to be imported. As the name implies, refCount returns an Observable that maintains a reference count of generated subscriptions.

The reference count increases when the observer subscribes to the Observable responsible for the reference count, and if the previous reference count is 0, the subject responsible for the multicast infrastructure subscribes to the source Observable. When an observer unsubscribes, the reference count decreases, and if the reference count goes to zero, the Subject unsubscribes to the source Observable.

Let’s use refCount:

const m = source.multicast(new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
Copy the code

After the code changes, the output looks like this:

observer a: 42
observer a: complete
observer b: complete
Copy the code

Only the first observer received the next notification. Let’s see why.

The source Observable in the example immediately sends a notification. That is, once subscribed, the source Observable issues next and complete notifications that cause the first observer to unsubscribe before the second observer can subscribe. The reference count goes to zero when the first one unsubscribes, so the subject responsible for the multicast infrastructure unsubscribes the source Observable as well.

When the second observer subscribes, the subject subscribes to the source Observable again, but since the subject has already received the complete notification, it cannot be reused.

The factory function passed to The subject for multicast solves this problem:

const m = source.multicast((a)= > new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
Copy the code

After the code changes, a new subject is created each time the source Observable is subscribed, and the output looks like this:

observer a: 42
observer a: complete
observer b: 54
observer b: complete
Copy the code

Because the source Observable sends notifications immediately, the observer receives notifications separately. Modify source to delay notification:

import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/multicast";

const source = Observable.defer((a)= > Observable.of(
  Math.floor(Math.random() * 100)
)).delay(0);
Copy the code

The observer will still receive a multicast notification, with output like this:

observer a: 42
observer b: 42
observer a: complete
observer b: complete
Copy the code

To summarize, the above example shows the following characteristics of the multicast operator:

  • Encapsulates the multicast infrastructure to conform to the multicast mental model;
  • providesconnectMethod to determine when the source Observable generated a subscription;
  • providesrefCountMethod to automatically manage subscriptions to the source Observable.
  • If you are usingrefCount, must be passedSubjectThe factory function ofSubjectInstance;

Next, let’s look at the Publish and Share operators, as well as the publish variants, and see how they build on what the multicast operator provides.

The publish operator

Let’s look at the publish operator using the following example:

import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/publish";

function random() {
  return Math.floor(Math.random() * 100);
}

const source = Observable.concat(
  Observable.defer((a)= > Observable.of(random())),
  Observable.defer((a)= > Observable.of(random())).delay(1));function observer(name: string) {
  return {
    next: (value: number) = > console.log(`observer ${name}: ${value}`),
    complete: (a)= > console.log(`observer ${name}: complete`)}; }const p = source.publish();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code

The source Observable in the example immediately emits a random number, after a short delay emits another random number, and completes. This example shows what happens when the subscriber subscribes before connect, after connect, and after the Observable that called publish completes.

publishThe operator is correctmulticastOperators are wrapped in a thin layer. It will be calledmulticastAnd the incomingSubject

The output of the example looks like this:

observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
Copy the code

The notice received by the observer can be summarized as follows:

  • aIs in theconnectSubscribed before calling, so it can receive twonextNotification andcompleteNotice.
  • bIs in theconnectSubscribed after the call, the first to be sent immediatelynextThe notification has already been sent, so it can only receive the second onenextNotification andcompleteNotice.
  • cSubscribe after the source Observable completes, so it only receivescompleteNotice.

Use refCount instead of connect:

const p = source.publish().refCount();
p.subscribe(observer("a"));
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code

The output of the example looks like this:

observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
Copy the code

The output is similar to that with CONNECT. Why is that?

B doesn’t receive the first Next notification because the first Next notification is sent immediately by the source Observable, so only A can receive it.

C subscribes after a publish Observable is called, so the reference count of the subscribe is 0, and a subscribe is regenerated. However, publish is passed to multicast by Subject, not factory function, because SUBJECTS cannot be reused, so C can only receive complete notification.

Publish and multicast operators both accept an optional selector function, and the operators behave quite differently if this function is specified. This will be covered in detail in another article, Secrets of the Multicast operator.

Special types of subjects

There are several variants of the publish operator, all of which wrap Multicast in a similar way, passing in subjects rather than factory functions. However, they introduce different types of subjects.

The special types of subjects used by the publish variant include:

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

The answer to how to use these special types of subjects is that each variant is associated with a special type of Subject, which you use when you want behavior similar to that of a publish variant. Let’s see how these variants behave.

PublishBehavior operator

The publishBehavior was passed to the Multicast BehaviorSubject, not Subject. The BehaviorSubject is similar to the Subject, but if the Subject’s subscription occurs before the source Observable issues a Next notification, the Subject issues a Next notification with an initial value.

Let’s change the example to give a short delay to the source Observable that generates random numbers so that it doesn’t immediately emit random numbers:

const delayed = Observable.timer(1).switchMapTo(source);
const p = delayed.publishBehavior(- 1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code

The output of the example looks like this:

observer a: -1
observer b: -1
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
Copy the code

The notice received by the observer can be summarized as follows:

  • aIs in theconnectSubscribed before the call, so it can receive an initial value with subjectnextNotification and source ObservablenextNotification andcompleteNotice.
  • bIs in theconnectAfter the call, the subject receives the first observable from the sourcenextSubscribed before notification, so it can receive an initial value of subjectnextNotification and source ObservablenextNotification andcompleteNotice.
  • cSubscribe after the source Observable completes, so it only receivescompleteNotice.

PublishReplay operator

PublishReplay passes ReplaySubject to multicast, not Subject. As the name implies, ReplaySubject replays a specified number of Next notifications each time an observer subscribes.

const p = source.publishReplay(1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code

Using publishReplay, the output of the example looks like this:

observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
Copy the code

The notice received by the observer can be summarized as follows:

  • aIs in theconnectSubscribed before the call, which the Subject has not yet receivednextNotice, soaTwo observables that receive the sourcenextNotification andcompleteNotice.
  • bIs in theconnectThe subject has received the first observable from the sourcenextNotice, sobYou can get a replaynextNotification, the second of the source ObservablenextNotification andcompleteNotice.
  • cSubscribes after the source Observable completes, so it receives the replaynextNotification andcompleteNotice.

Looking at the behavior of C, it is clear that, unlike the publish operator, the publishReplay operator is suitable for using the refCount method because observers can receive any number of next notifications for replays even after the source Observable has subscribed.

PublishLast operator

PublishLast is passed to multicast AsyncSubject, not Subject. AsyncSubject is the most special special type of subjects. Only when it completes does it issue a Next notification (if any) and a complete notification, which is the last Next notification in the source Observable.

const p = source.publishLast();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout((a)= > p.subscribe(observer("c")), 10);
Copy the code

Using publishLast, the output from the example looks like this:

observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
Copy the code

The notice received by the observer can be summarized as follows:

  • abThey subscribe before the source Observable completes, but they don’t receive notifications until the source Observable completes, and they receive one with a second random numbernextNotification andcompleteNotice.
  • cSubscribe after the source Observable completes, and it receives one with a second random numbernextNotification andcompleteNotice.

Like publishReplay, the publishLast operator works well with the refCount method because observers can receive any number of next notifications for replays even after the source Observable has subscribed.

Share the operator

The share operator is similar to using publish().refcount (). However, share passes to Multicast as a factory function, which means that a new Subject is created to subscribe to the source Observable if the reference count is 0.

const s = source.share();
s.subscribe(observer("a"));
s.subscribe(observer("b"));
setTimeout((a)= > s.subscribe(observer("c")), 10);
Copy the code

Using share, the sample output looks like this:

observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 6
observer c: 9
observer c: complete
Copy the code

The notice received by the observer can be summarized as follows:

  • aSubscribe and receive the first one immediatelynextNotice, followed by a secondnextNotification andcompleteNotice.
  • bCan only receive the secondnextNotification andcompleteNotice.
  • cSubscribe after the source Observable completes, a new subject is created to subscribe to the source Observable, which immediately receives the firstnextNotice, followed by a secondnextNotification andcompleteNotice.

In the examples above, we introduced the Publish and share operators, which automatically unsubscribe a and B when the source Observable completes. They also automatically unsubscribe if the source Observable reports an error. The publish and share operators differ in another way:

  • If the source Observable reports an error, call thepublishAny future subscribers of the returned Observable will receive iterrorNotice.
  • However, byshareAny future subscribers of the returned Observable generate a new subscription to the source Observable, because errors automatically cancel any subscriber’s subscription, returning its reference count to zero.

That’s it. That’s the end of this article. We introduced six operators, but they are all implemented in a similar way, and they all conform to the same basic mental model: a source Observable, a subject of a subscribing Observable, and observers of multiple subscribing subjects.

This article has only briefly covered the refCount method. For an in-depth look, see RxJS: How to Use refCount.