RxJS, Reactive Extensions for JavaScript, is a library that uses Observable sequences to write asynchronous and event-based applications.

Photo by Jasper Boer on Unsplash

Here’s a look at the use of the four types of Subject in RXJS.

introduce

The “Subject” is an Observable for multicast, meaning that the Subject ensures that each subscription gets exactly the same value as the Observable shared between subscribers.

Before introducing them, let’s take a look at the differences between four subjects and common Observables:

Observable Subject BehaviorSubject ReplaySubject AsyncSubject
Data generator Data producers and consumers Data producers and consumers Data producers and consumers Data producers and consumers
unicast multicast multicast multicast multicast
Each subscription generates a new instance, and each subscription receives a value from the first generated value, so each subscription receives the same value Multicast the value to observers who have registered to listen to the Subject Immediately send the latest value to the new observer (initial value required for instantiation) Multiple old values can be stored and replayed to new subscribers The last value after executing the complate method is sent to all subscribers

Subject

Subject is actually an implementation of the observer pattern, so when an observer subscribes to a Subject object, it adds the subscriber to the observer list. Each time a new value is received, it traverses the observer list, calling the next method inside the observer in turn to send the values one by one. Let’s look at how Subject is used:


The above example is easier to understand:

  1. We created oneSubject
  2. Emitted a value1, but since there are no subscribers at this time, this value will not be subscribed to
  3. Subscriber A is created
  4. Another value is emitted2Subscriber A receives this value
  5. Create another subscriber B
  6. Finally emit a value3, which will be received by all subscribers

BehaviorSubject

Many times we want the Subject to represent the current state, rather than simply sending events, meaning that if there is a new subscription, we want the Subject to give the latest value immediately, rather than no response. At this point we can use the BehaviorSubject

The BehaviorSubject inherits from The Subject and has the behavior of storing the current value. This means that you can always get the last emitted value directly from the BehaviorSubject. See the following code example:


What does this code do?

  1. We created one firstBehaviorSubjectAn instance of thebehavior$And pass in the initial value at instantiation time0.
  2. And then we subscribe to this this instancebehavior$Because ofBehaviorSubjectIs characterized by publishing the latest value to the subscriber, and subscriber A gets the initial value0“, so it will play outSubscriber A, subscription value: 0
  3. behavior$Use the built-innextMethod emits a new value1Subscriber A will receive the new value and print it outSubscriber A, the subscription value is 1
  4. We add a subscriber B, at which point it gets the latest value1, so the printed result isSubscriber B, subscription value 1
  5. And finally we call it againbehavior$thenextMethod, since we have subscribed twice before, both subscriber A and subscriber B will receive the new onevalue

ReplaySubject

We can use ReplaySubject when we create a Subject but want it to resend the last few values with each new subscription.

ReplaySubject can send old data to new subscribers, which is a lot like BehaviorSubject, but it has an additional feature, which is that it can log a portion of observable execution, so it can store multiple old values and send them to its new subscribers.

When you create a ReplaySubject, you can specify how many values you want to store and for how long. The first parameter bufferSize specifies the size of the cache, which defaults to Infinity and is a “space limit” for caching all emitted values. We can also pass it a second parameter, windowTime, specifying a “time limit” for the cache, which defaults to Infinity, or the expiration of an unlimited value. See the following code example:


Here’s what happened:

  1. We created oneReplaySubjectAn instance of thereplay$And specify that we only want to store the last two values
  2. We create A subscriber A
  3. Call three timesreplay$thenextMethod to publish the value to subscribers. Subscriber A will print three times
  4. Experience it nowReplaySubjectThe magic. We use thereplay$Creates a new subscriber B as we tellReplaySubject, stores two values, so it will issue these final values directly to subscriber B, who will print them out.
  5. replay$Issue another value, at which point both subscriber A and subscriber B receive the change and print out another value

As mentioned earlier, you can also specify how long the value is stored in the ReplaySubject. Let’s look at an example


What happens in the code above:

  1. We createdReplaySubjectAnd specify that it stores only the last two values, but not more than 100ms
  2. Create subscriber A
  3. We start emitting a new value every 200ms. Subscriber A receives all emitted values
  4. We create a subscriber B, since we are subscribing after 1000ms. This means that before you start subscribing,replay$Five values have been emitted. When creating aReplaySubject, we specify a maximum of two values to be stored and no more than 100ms. This means that after 1000ms, subscriber B starts to subscribe when due toreplay$It is 200ms to send a value, so subscriber B will only receive 1 value.

Some of you might see a BehaviorSubject like ReplaySubject(1). However, they are different both in concept and in behavior.

First, the conceptual difference is fundamental. The ReplaySubject just caches the most recent value. It still reflects a flow with continuous value creation (” multi-value “), whereas the BehaviorSubject reflects a value that changes over time (” single-value “). Therefore, the BehaviorSubject needs to pass in an initial value, which then changes so that we can only see the current value.

In terms of behavior, since the ReplaySubject focuses on caching, it does not affect our ability to continue observing its cached values when it is finished. Let’s look at this example:


Once the ReplaySubject completes complate, we subscribe to it and still get the cached value, whereas the BehaviorSubject completes complate and we continue to subscribe to it has no effect.

AsyncSubject

Although both BehaviorSubject and ReplaySubject store values, AsyncSubject works differently. AsyncSubject is a Subject variant in which only the last value executed by an Observable is sent to its subscribers, and only when execution is complete (similar to the last method in RXJS/Operators). Please refer to the following sample code:


Not much happened this time. But let’s look at the execution steps:

  1. We createAsyncSubjectAn instance of theasync$
  2. We subscribe through subscriber A
  3. async$Emitted 2 values, still no change
  4. We create a subscriber B
  5. A new value is issued, but neither subscriber responds
  6. async$performcomplateFinish, at which point the last value is sent to all subscribers

As you can see from the code example above, AsyncSubject does not send the last value until it executes complate, much like Promise, which waits until the end of the event to send a value. In a Promise, we declare the completion of the task through resolve(value), send the obtained value, and then process the resulting value through the promise.then () method.

conclusion

This paper introduces some application modes of Subject, and features of BehaviorSubject, ReplaySubject and AsyncSubject. I don’t know if you have any harvest, if you like it, click “like” to pay attention to it.