Pull and Push
Are two different protocols used to describe the communication between Data producers and Data consumers.
What is the Pull pattern? In a Pull data system, consumers decide when to receive data from the Producer, while the Producer itself has no sense of when to deliver data.
For example, Function, which we are most familiar with, is a simple Pull data system for each JavaScript Function. The function itself is a data producer, and the calling runtime consumes data by pulling the function’s single return value.
In ES2015, generator functions and iterators (function*) were introduced, followed by another way of implementing Pull data systems. The iterator iterator.next() is executed to pull data from iterator, the data producer, for consumption.
Producer |
Consumer |
|
Pull |
Passive: Produce data when a data request comes in |
Proactive: Decide when to request data |
Push |
Initiative: Produce data at your own pace |
Passive: Receives data in a responsive manner |
What is Push mode? In a Push data system, producers decide when to send data to consumers. Consumers themselves have no sense of when to receive data.
Promise is the most common Push data system in JS today. In a Promise, the Consumer registers the callbacks with the then method, and the Promise, as the Producer, sends an Resolved value, but unlike the function, the Promise determines exactly when the data will be pushed back to the callbacks.
RxJS introduces Observable as a new way to implement Push data system. Each Observable is a Producer containing multiple, or even infinite, data that is pushed to the Observer (Consumer).
summary
For the comparison of the above two different data communication modes, there are several different realization modes respectively:
- Function is lazily executed, and only calls synchronize return single data in their execution
- A generator is itself a Function, and the generated iterator executes lazily, unlike a Function, which can return (yield) zero or unlimited data
- A Promise may return a single piece of data or even not return anything
- An Observable is lazy, synchronizing or asynchronously returning (next) zero or even infinite data
Subscription
Functions and Observables are lazy. If you don’t call a Function or subscribe to an Observable, you don’t get value return or side effects.
Subscribing to an Observable is analogous to calling a Function.
Having an Observable subscribe to an observer is equivalent to calling a Function. In other words, Observable.subscribe is the entry point that directly triggers Observable data push, which is key to understanding how RxJS works overall.
All of the following “Observable data push” statements refer to an Observable instance subscribing to an observer.
The difference between a Function and an Observable is that an Observable can continuously “return” multiple values. For example,
function foo() {
return 'one value';
return 'another value'; // dead code. the 'another value will never be returned'
}
// console
// 'one value'
Copy the code
And observables can
import { Observable } from 'rxjs';
const foo = new Observable(subscriber= > {
subscriber.next('one value');
subscriber.next('another value'); // "return" another value
/ /...
});
foo.subscribe(console.log.bind(null));
// console
// 'one value'
// 'another value'Copy the code
An Observable can transmit data synchronously or asynchronously
import { Observable } from 'rxjs';
const foo = new Observable(subscriber= > {
subscriber.next('one value');
setTimeout((a)= > {
subscriber.next('another value'); // "return" another value, happens asynchronously
});
/ /...
});
foo.subscribe(console.log.bind(null));
console.log('after subscribe');
// console
// 'one value'
// 'after subscribe'
// 'another value' // returns asynchronouslyCopy the code
summary
- Function.call(), which means that a return value is synchronized
- Observable.subscribe, which means to get any number of return values synchronously or asynchronously
Observable
Observables, which makes it easier to combine code for asynchronous calls or callbacks based on lazy-data-push collections of multiple values.
The anatomy of the observables
Observables created using a new Observable or an operator (a special type of operator), Subscribed to with an Observer, execute to deliver next/error/complete notifications to the Observer.
Observable, as a data Producer, needs to accept a subscriber as a constructor parameter and implement the following interface form:
{
next: (value) => any,
error: (err) => any,
complete: () => any
}Copy the code
Then, observable. subscribe needs to receive an observer that also implements the above subscriber interface or function next() as the data consumer, thus forming a data communication pipeline based on the observation and subscription mode. The power of this pipeline is that an Observable is a pipeline that can pipe various operators to control data transmission in the process of Observable data push. It includes quantity limitation (take, takeUntil), type conversion (map, mapTo, etc.), timing control, context switching (mainly done by scheduler) and so on.
Observable pipeline mechanism
According to the above analysis, we know that Observable is a push data system, and it will continuously push data to the data Consumer such as subscriber that has been subscribed by way of subscribe.next (value). In fact, to implement the pipeline mechanism, the problem to be solved is nothing more than how to transfer the data in order to the data processor such as operator, and then finally to the subscriber.
A subscriber is required to implement the Subscription interface and contains a next method. As the name implies, next is the “next”, which is exactly the meaning of pipeline, just need to connect all operators through the next method. It was natural to just call the downstream subscriber’s next method in the upstream subscriber’s next method. RxJS makes a very clever setting, passing downstream subscriber to its constructor when generating an operator instance, concatenating its own subscriber with the downstream subscriber just delivered through the destination attribute, It can also be the ultimate subscriber subscribed, so that a subscriber Linked list based on the destination attribute is established.
Observable. pipe(map(x => x+1)))
The details are illustrated with comments in the code below
export function map<T.R> (project: (value: T, index: number) = >R.thisArg? :any) :OperatorFunction<T.R> {
return function mapOperation(source: Observable<T>) :Observable<R> {
if (typeofproject ! = ='function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`? ');
}
// "source.lift" call the Observable which is passed through to lift the operator
// then create a new Observable instance
// and link the initial Observable which call the "pipe"
return source.lift(new MapOperator(project, thisArg));
};
}Copy the code
In the Observable pipe method:
pipe(... operations: OperatorFunction<any, any>[]): Observable<any> {if (operations.length === 0) {
return this as any;
}
return pipeFromArray(operations)(this); // pass "this" of current Observable through
}Copy the code
export function pipeFromArray<T.R> (fns: Array<UnaryFunction<T, R>>) :UnaryFunction<T.R> {
if(! fns) {return noop as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
// there the "input" is the initial instantiated Observable
// "fn(prev)" just call the "source.lift" above
return function piped(input: T) :R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) = > fn(prev), input as any);
};
}Copy the code
By lifting operator, link the initially instantiated Observable instance with the newly generated Observable instance through the source attribute to generate an Observable Linked list, and mount the current operator instance. The operator instance here is the new mapOperator (project, thisArg) in the mapOperator function as mentioned in the above example.
lift(operator) {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}Copy the code
When we subscribe to a specific subscriber after we get the final List of Observable instances, we generate our own subscriber for each specific operator instance. In the following code, the operator call method generates its own new MapSubscriber(subscriber, this.project, this.thisarg). The downstream subscriber will be delivered in, and the Observable instance in the Linked list will be traced back to the Observable instance that was originally instantiated.
export class MapOperator<T.R> implements Operator<T.R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber<R>, source: any): any {
// the "source" is just some very upstream Observable of the Observable linked list
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg)); }}Copy the code
subscribe(observerOrNext? : PartialObserver<T> |((value: T) = > void) | null, error? :((error: any) = > void) | null, complete? :(() = > void) | null): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
// when the final Observable instance of the linked list of Observables subscribe some observer
// there the operator call the “call” instance method to make the subscriber of itself
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && ! sink.syncErrorThrowable) ?this._subscribe(sink) :
this._trySubscribe(sink)
);
}
// ...
return sink;
}Copy the code
Relationship between Observable and Scheduler
What is a Scheduler
Scheduler controls when Subscription starts and when data messages are delivered. It consists of three main parts:
Scheduler
Is a data structure. Actions are stored and sorted by action priority or other factors such as timer delay.Scheduler
Is an execution context. Represents the execution environment and timing of the action (whether it is executed immediately, or based on callback mechanisms such as setTimeout, setInterval, Process.nexttick, or animation Frame).Scheduler
There is an internal virtual clock. Through internalnow()
Method to obtain the current timestamp. Any triggered action can be associated with this timestamp by the virtual clock.
Scheduler works
A Scheduler lets you define in what execution context will an Observable deliver notifications to its The main function of the Scheduler is to determine the execution context of Observable Data push.
RxJS built-in schedulers mainly include asyncScheduler, queueScheduler, asapScheduler and animationFrameScheduler.
QueueScheduler, asapScheduler, and animationFrameScheduler all inherit from asyncScheduler, Both trigger asynchronous Observable data push via timer timer setInterval. Determines whether to enable the timer or self-implemented asynchronous mode from the received timer delay
asyncScheduler
Maintains an Action Queue while usingasyncAction
Convert subscribe to asynchronous execution using a timer setInterval. Cooperate withasyncScheduler
The main operator ofsubscribeOn
.subscribeOn
The work Handler is used to add downstream subscriber to upstream Observable sourceasyncAction
The Timer method asynchronously triggers Observable data push.queueScheduler
, just useasyncScheduler
The Action queue feature of the The main operator coordination isobserveOn
.asapScheduler
The inside is throughpromiseForm micro Task Queue to implement asynchronism.animationFrameScheduler
The inside is throughrequestAnimationFrame
To implement asynchrony.
The Scheduler and the Action
QueueAction, asapAction, and animationFrameAction all inherit from asyncAction
An Action is a scheduling Action. A scheduled action contains the specific Work Handler to execute the action and state, which is processed in the Work Handler.
asyncScheduler
theasyncAction
, the execution of the internal Work handler is triggered by the Timer setInterval callback mechanism.
protected requestAsyncId(scheduler: AsyncScheduler, id? : any,delay: number = 0): any {
return setInterval(scheduler.flush.bind(scheduler, this), delay);
}Copy the code
queueScheduler
thequeueAction
, uses scheduler’s Action queue to store the current actions in order, and triggers Observable data push based on the order of the actions in the queue.asapScheduler
theasapAction
, using the promise form to trigger the Work Handler.Immediate.setImmediate
Internally, the model is modeled using the promise.resolve().then(cb) formSetImmediate.
protected requestAsyncId(scheduler: AsapScheduler, id? : any,delay: number = 0): any {
// If delay is greater than 0, request as an async action.
if(delay ! = =null && delay > 0) {
return super.requestAsyncId(scheduler, id, delay);
}
// Push the action to the end of the scheduler queue.
scheduler.actions.push(this);
// If a microtask has already been scheduled, don't schedule another
// one. If a microtask hasn't been scheduled yet, schedule one now. Return
// the current scheduled microtask id.
return scheduler.scheduled || (scheduler.scheduled = Immediate.setImmediate(
scheduler.flush.bind(scheduler, undefined))); }Copy the code
- AnimationFrameScheduler’s animationFrameAction, which uses the requestAnimationFrame to trigger the Work Hander.
protected requestAsyncId(scheduler: AnimationFrameScheduler, id? : any,delay: number = 0): any {
// If delay is greater than 0, request as an async action.
if(delay ! = =null && delay > 0) {
return super.requestAsyncId(scheduler, id, delay);
}
// Push the action to the end of the scheduler queue.
scheduler.actions.push(this);
// If an animation frame has already been requested, don't request another
// one. If an animation frame hasn't been requested yet, request one. Return
// the current animation frame request id.
return scheduler.scheduled || (scheduler.scheduled = requestAnimationFrame(
(a)= > scheduler.flush(undefined)));
}Copy the code
As mentioned above, the main function of Scheduler is to determine the execution context of Observable data push, and how to implement the specific scheduling schedule requires user-defined work handler. Several special operators have been constructed inside RxJS. It is designed to cooperate with Scheduler, such as subscribeOn and observeOn.
Here is a simple illustration of subscribeOn as an example:
As an operator, subscribeOn also generates its own subscriber through lift and then call
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return new SubscribeOnObservable<T>(
source, this.delay, this.scheduler
).subscribe(subscriber);
}Copy the code
Inside the SubscribeOnObservable, passed SubscribeOnObservable. Dispatch as work handler, and the state is {source, the subscriber}, So when used in conjunction with asyncScheduler, source.subscribe(subscriber) can be executed in timer setInterval callback to asynchronously trigger Observable data push.
static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription {
const { source, subscriber } = arg;
return this.add(source.subscribe(subscriber));
}
_subscribe(subscriber: Subscriber<T>) {
const delay = this.delayTime;
const source = this.source;
const scheduler = this.scheduler;
return scheduler.schedule<DispatchArg<any>>(SubscribeOnObservable.dispatch as any, delay, {
source, subscriber
});
}Copy the code
Subject
What is Subject? It is a special type of Observable that can pass data to multiple observers at once, whereas a normal Observable can only push data to a single observer.
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.
Each Subject is an Observable that can also subscribe. In contrast, within a Subject, subscribe does not immediately trigger Observable data push, but simply registers an observer to its own observer list. Similar to addEventListener.
_subscribe(subscriber: Subscriber<T>): Subscription {
/ /...
this.observers.push(subscriber);
return new SubjectSubscription(this, subscriber);
/ /...
}Copy the code
At the same time, each Subject is an Observer with next(v), Error (e), and complete(). When the next(value) method is called, the data is passed to all registered observers. It can be said that Subject is a self-producing and self-consuming data system.
Since Subject is a special Observable, it also has a pipeline mechanism, but is somewhat different from ordinary Observables, which can be clearly seen through the lift method of Subject class.
lift<R>(operator: Operator<T, R>): Observable<R> {
const subject = new AnonymousSubject(this.this);
subject.operator = <any>operator;
return <any>subject;
}Copy the code
From the lift method above, you can see that the Source property used to implement the Observable Pipeline mechanism is not mounted directly on the newly generated AnonymousSubject instance. This is where the magic happens: the AnonymousSubject class, which inherits from Subject, takes the current Subject instance as a parameter when constructing an instance, and its constructor does only one thing: mount the current Subject instance to the source property of the new Subject.
constructor(protected destination? : Observer<T>, source? : Observable<T>) {super(a);this.source = source;
}
next(value: T) {
const { destination } = this;
if (destination && destination.next) {
destination.next(value);
}
}
_subscribe(subscriber: Subscriber<T>): Subscription {
const { source } = this;
if (source) {
return this.source! .subscribe(subscriber); }else {
returnSubscription.EMPTY; }}Copy the code
As we can see from the above code implementation, the AnonymousSubject class instance list generated after PIPE actually ends up with both the SUBSCRIBE method and the next method backtracking to the original Subject instance. Since Subject is both an Observable and Observer, the AnonymousSubject class overwrites the subscribe and Next methods of its parent class Subject for this reason.
Observables and story
The Pipeline mechanism of Observable fits well with the Action processing flow of Redux, and the community’s Redux-Observable library makes use of this mechanism of RxJS to produce another kind of Redux middleware.
const result$ = epic$.pipe(
map(epic= > {
constoutput$ = epic(action$, state$, options.dependencies!) ;// ...
return output$;
}),
mergeMap(output$= >
from(output$).pipe(
subscribeOn(uniqueQueueScheduler),
observeOn(uniqueQueueScheduler)
)
)
);
result$.subscribe(store.dispatch);Copy the code
Epic $as a Subject instance, the action$Observable goes through the pipeline process in the above code through the following middleware startup mode, and actionSubject$as the upstream source of the action$. Successfully subscribe to the entire pipeline by backtracking up with the subscribe action$method. The Redux-Observable also adds queueScheduler to the pipeline to queue all dispatch actions.
epicMiddleware.run = rootEpic= > {
// ...
epic$.next(rootEpic);
};Copy the code
In the resulting middleware next function, when actionSubject$.next fires, the action is passed to action$, just through the pipeline from above, and the entire action processing stream starts running.
action => {
// ...
actionSubject$.next(action);
// ...
};Copy the code
In fact, there is an Observable method in the Store of redux itself, which is the simplest implementation of the above.
conclusion
The observe subscription pattern is one of the most important JavaScript design patterns, and not only exists in the JavaScript world, it is used in a wide range of scenarios. Most of the visual libraries, view libraries, or view frameworks in today’s front-end world are based on this pattern, like Redux, React-Redux, Vue, etc., which are familiar to front-end programmers. In fact, to look at the technical nature of the subscription model, my personal understanding is that it is a mode of data communication, which can be synchronous or asynchronous, blocking or non-blocking. From the small image library to the large system software, it is everywhere, deepen the understanding of it, can make the application written more robust!