The original link: blog.angularindepth.com/rxjs-multic…

This article is translated by RxJS Chinese community, if you need to reprint, please indicate the source, thank you for your cooperation!

If you would like to translate more quality RxJS articles with us, please click here.

The Multicast operator has a secret. The same is true of the Publish operator, which encapsulates multicast. This secret really works sometimes.

secret

The ConnectableObservable is mentioned in both multicast and Publish documentation. A ConnectableObservable is a special type of Observable that starts sending notifications to subscribers only after calling its Connect method. However, the multicast and publish operators do not always return a ConnectableObservable.

Publish: publish: publish

export function publish<T>( this: Observable<T>, selector? : (source: Observable<T>) => Observable<T> ): Observable<T> | ConnectableObservable<T> { return selector ? multicast.call(this, () => new Subject<T>(), selector) : multicast.call(this, new Subject<T>()); }Copy the code

It is clear that publish encapsulates multicast very thinly. It creates a Subject and passes it to Multicast, with an optional selector function. The most interesting part is in the multicast implementation, which contains the following code:

if (typeof selector === 'function') {
  return this.lift(new MulticastOperator(subjectFactory, selector));
}

const connectable: any = Object.create(this, connectableObservableDescriptor);
connectable.source = this;
connectable.subjectFactory = subjectFactory;
return <ConnectableObservable<T>> connectable;
Copy the code

Multicast returns a ConnectableObservable only if no selector is passed in. If the selector function is passed in, the lift mechanism is used to cause the source Observable to create the appropriate type of Observable. There is no need to call the connect method on the returned Observable, and the source Observable is shared in the selector’s scope.

This means that the multicast (and publish) operators can be used to easily implement local sharing of source Observables.

Use publish for local sharing

Let’s look at an example using publish.

RxJS introduces the defaultIfEmpty operator, which receives a value and emits it if the source Observable is empty. Sometimes it’s more useful to be able to specify a defaultObservable than a single value, so let’s implement defaultObservableIfEmpty, which can be used with the let operator.

The following marble map shows how the source Observable behaves when it is empty:

RxJS introduces the isEmpty operator, which emits a Boolean to indicate whether the source observable isEmpty when it completes. However, to use it in the defaultObservableIfEmpty implementation, you need to share the source Observable because you need to notify the value, which isEmpty can’t do. The publish operator makes it easy to share the source Observable as follows:

function defaultObservableIfEmpty<T> (
  defaultObservable: Observable<T>
) : (source: Observable<T>) = >Observable<T> {

  return source= > source.publish(shared= > shared.merge(
    shared.isEmpty().mergeMap(empty= > empty ?
      defaultObservable :
      Observable.empty<T>()
    )
  ));
}
Copy the code

To the published selector that receives the shared source Observable. The returned observable is a shared source Observable and an observable based on whether the source Observable is empty. (If the source Observable is empty, it is the default observable passed in. Otherwise, it is a combination of empty Observable).

The sharing of the source Observable is completely publishably managed. Using the selector function, you can subscribe to a shared Observable as many times as needed without affecting the subscriptions behind the source Observable.

Local sharing using multicast

Let’s look at another example, this time using multicast.

RxJS introduces the takeWhile operator, which returns an Observable that emits the value of the source Observable until a value that does not satisfy the given condition appears, at which point the Observable is done. The value that does not meet the criteria will not be emitted. Let’s implement a takeWhileInclusive function that can be used with the let operator.

The following pinball diagram shows the behavior of values that do not meet the condition:

This can be done using the takeWhile operator as a base, and when the condition is not met, simply concatenate the value that does not meet the condition. To retrieve this value after the Observable returned by takeWhile completes, use ReplaySubject:

function takeWhileInclusive<T>( predicate: (value: T) => boolean ): (source: Observable<T>) => Observable<T> { return source => source.multicast( () => new ReplaySubject<T>(1), shared => shared .takeWhile(predicate) .concat(shared.take(1).filter(t => ! predicate(t))) ); }Copy the code

ReplaySubject with buffer size 1 is used to share the source Observable. When the Observable returned by the takeWhile operator is complete, the shared Observable is concatenated, using take(1) ensures that only replay values are considered, and filter ensures that appends only when conditions are not met.

How reliable is this approach?

RxJS 5 is a fairly new library and its documentation is thrown in progress, so this approach is not yet documented and is only used internally. The exposed TypeScript signature indicates that a ConnectableObservable is not always returned, and there are corresponding unit tests.

RxJS is generally flexible, so there are other ways to implement these functions, but the example above shows that publish and Multicast are easy to use and worth considering when a local shared source Observable is needed.