DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers. Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)
The introduction
This is the fourth RxJS source code analysis article, using the source version is 6.6.0. The content of this article may still be quite extensive, please bear with me. In order to facilitate reading, the relevant code has been trimmed and processed. If there is anything wrong, please correct it.
Before starting this article, define some custom terms to make them easier to read.
- Top stream: a stream that calls the operator.
- Upstream stream: the stream to which the operator’s internal subscriber subscribes.
- Downstream stream: a stream managed by the operator’s internal subscriber.
- Terminating subscriptions: Subscribers who subscribed to the stream generated by the operator.
I’m not going to grab a few operators and print them out, as I did in the last article. Starting from this article, there are great rules for both the Join Operator and the Transformation Operator. Therefore, I want to summarize their rules first, and then analyze the operator.
How to control downstream flow
To give the operator control over the downstream stream, RxJS uses the delegate mode to make the operator generate a specific Subscriber that internally gets all incoming subscriptions to the downstream stream. Therefore, the OuterSubscriber and InnerSubscriber are introduced here.
-
OuterSubscriber: equivalent to the principal, provides three interfaces for notify – notifyNext, notifyComplete, and notifyError.
-
InnerSubscriber: it is equivalent to the entrusted. When it is constructed, it needs to input OuterSubscriber, then the corresponding subscription operation is triggered, and it will invoke the corresponding notify interface of OuterSubscriber.
Its internal implementation is actually to forward the InnerSubscriber’s next, error and complete to OuterSubscriber.
export class InnerSubscriber<T, R> extends Subscriber<R> { private index = 0; constructor(private parent: OuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) { super(); } protected _next(value: R): void { this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this); } protected _error(error: any): void { this.parent.notifyError(error, this); this.unsubscribe(); } protected _complete(): void { this.parent.notifyComplete(this); this.unsubscribe(); }}Copy the code
The default implementation of OuterSubscriber, on the other hand, is to submit the data to the termination subscription for forwarding out.
export class OuterSubscriber<T, R> extends Subscriber<T> { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { this.destination.next(innerValue); } notifyError(error: any, innerSub: InnerSubscriber<T, R>): void { this.destination.error(error); } notifyComplete(innerSub: InnerSubscriber<T, R>): void { this.destination.complete(); }}Copy the code
Different operators may require different Subscriber generation, which calls subscribeToResult. This function makes a type judgment based on the incoming ObservableInput and returns a properly processed subscription. To make it reusable, you call the subscribeTo that was previously used for FROM. In this function, the list, Promise, and generator data are processed and a subscription is returned.
export function subscribeToResult<T, R>( outerSubscriber: OuterSubscriber<T, R>, result: any, outerValue? : T, outerIndex: number = 0, innerSubscriber: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) ): Subscription | undefined { if (innerSubscriber.closed) { return undefined; } if (result instanceof Observable) { return result.subscribe(innerSubscriber); } return subscribeTo(result)(innerSubscriber) as Subscription; }Copy the code
Through this design, the Subscriber generated has the ability to control the downstream flow state. This ability enables data boxing and unboxing to be placed in the same Subscriber, while also centralizing side effects in one Subscriber, making operators behave like pure functions.
In the following and the next part, a large number of subscribeToResult will appear. We only need to know that this function forwards the subscribed data or information to the relevant interface of OuterSubscriber, and its functions will not be described here.
Finally, we return to the Operators source analysis. Since the overall rules and design are understood, it is also possible to understand why some operators are designed the way they are when analyzing each operator.
Here, we continue along the content of the previous article and analyze Join Creation Operators first.
race
The so-called race means that all streams are in a race, and the stream that wins can stay and continue to send data, while the stream that doesn’t has to unsubscribe.
const first = interval(1000).pipe(take(1), mapTo('first')); const second = interval(2000).pipe(take(1), mapTo('second')); const race$ = race(first, second); race$.subscribe((v) => console.log(v)); // Print the result // firstCopy the code
Race sends an incoming Observable to an internal subscriber through fromArray.
export function race<T>(... observables: ObservableInput<any>[]): Observable<T> { return fromArray(observables).lift(new RaceOperator<T>()); }Copy the code
RaceSubscriber
RaceSubscriber keeps several states.
private hasFirst: boolean = false;
private observables: Observable<any>[] = [];
private subscriptions: Subscription[] = [];
Copy the code
Observables cache the upstream output Observables after they subscribe, and then subscribe to them and save the subscription objects when the upstream output is finished.
protected _complete() {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
this.destination.complete();
} else {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable as any, i);
if (this.subscriptions) {
this.subscriptions.push(subscription);
}
this.add(subscription);
}
this.observables = null;
}
}
Copy the code
notify
In notifyNext, RaceSubscriber can get subscription data for downstream streams. And judge hasFirst. If the data is the first to arrive, update the hasFirst state and unsubscribe the remaining downstream streams so that only this downstream stream’s data is sent to the termination subscription.
notifyNext( outerValue: T, innerValue: T, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, T> ): void { if (! This.hasfirst) {// Update status this.hasFirst = true; // for (let i = 0; i < this.subscriptions.length; i++) { if (i ! == outerIndex) { let subscription = this.subscriptions[i]; subscription.unsubscribe(); this.remove(subscription); } } this.subscriptions = null; } this.destination.next(innerValue); }Copy the code
zip
Zip is an operator that takes the stream with the least amount of data in the downstream stream as a baseline and combines it with the rest of the downstream stream in order.
Zip, too, uses fromArray to hand over input to an internal subscriber.
export function zip<O extends ObservableInput<any>, R>( ... observables: O[] ): Observables < ObservedValueOf < O > [] | R > {/ / by fromArray incoming parameters in the form of flow into the subscription in return fromArray (observables, undefined).lift(new ZipOperator()); }Copy the code
ZipSubscriber
Subscription starts, ZipSubscriber is generated and _next is called. The input stream is passed to different iterators according to its type. The data types of the input stream include the following:
-
An array of
-
Generator or iterator
-
Observable
protected _next(value: any) { const iterators = this.iterators; if (isArray(value)) { iterators.push(new StaticArrayIterator(value)); } else if (typeof value[Symbol_iterator] === ‘function’) { iterators.push(new StaticIterator(valueSymbol_iterator)); } else { iterators.push(new ZipBufferIterator(this.destination, this, value)); }}
Observables are more important than static data. I have already mentioned the function of OuterSubscriber, so I will not repeat it here. The ZipBufferIterator inherits OuterSubscriber, implements the operations, and then maintains the Observables and subscribes.
In zip, the upstream stream is the Observable generated by fromArray. When it is finished, iterators stored in Next are looped through. As we can see from next, the content that corresponds to ObservableInput will be generated internally. If subscription is implemented, then subscribe to these iterators, otherwise, proceed statically.
protected _complete() { const iterators = this.iterators; const len = iterators.length; this.unsubscribe(); if (len === 0) { this.destination.complete(); return; } this.active = len; for (let i = 0; i < len; i++) { let iterator: ZipBufferIterator<any, any> = <any>iterators[i]; if (iterator.stillUnsubscribed) { const destination = this.destination as Subscription; // Hold and manage the iterator's subscription destination.add(iterator.subscribe(iterator, I)); } else {// Not Observable this.active--; }}}Copy the code
The ZipBufferIterator inherits the OuterSubscriber, so it must also maintain an InnerSubscriber internally to forward data from the downstream stream.
class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAheadIterator<T> { ... subscribe(value: any, index: number) { const subscriber = new InnerSubscriber(this, index as any, undefined); return subscribeToResult<any, any>(this, this.observable, undefined, undefined, subscriber); }... }Copy the code
notify
The ZipBufferIterator maintains the InnerSubscriber internally, which means that the data is sent to notifyNext, which uses an array to cache the data.
notifyNext(outerValue: T, innerValue: any,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.buffer.push(innerValue);
this.parent.checkIterators();
}
Copy the code
And then will call ZipSubscriber checkIterators, this method determines the end subscription data sources, but also put an end to subscribe to complete the required conditions are given.
checkIterators() { const iterators = this.iterators; const len = iterators.length; const destination = this.destination; // Do all iterators have data? for (let i = 0; i < len; i++) { let iterator = iterators[i]; if (typeof iterator.hasValue === 'function' && ! iterator.hasValue()) { return; } } let shouldComplete = false; Const args: any[] = []; for (let i = 0; i < len; i++) { let iterator = iterators[i]; let result = iterator.next(); If (iterator.hascompleted ()) {shouldComplete = true; } // If the result has reached the end, it means that the shortest data has been output. // It is possible that the data is not at the end, but the iterator is finished. if (result.done) { destination.complete(); return; } // Collect data from all iterators. args.push(result.value); } // Send to terminating subscription destination.next(args); // if (shouldComplete) { destination.complete(); }}Copy the code
When a downstream stream completes, the presence or absence of a buffer determines whether the terminating subscription completes.
notifyComplete() { if (this.buffer.length > 0) { this.isComplete = true; this.parent.notifyInactive(); } else { this.destination.complete(); }}Copy the code
If the data buffer, then have to call ZipSubscriber. NotifyInactive, return the information to the ZipSubscriber. At this point, it means that one of the downstream streams has completely sent data and must update the active record. If active ends up being 0, the termination subscription is notified that the stream is complete.
notifyInactive() { this.active--; if (this.active === 0) { this.destination.complete(); }}Copy the code
CombineLatest
Unlike ZIP, in CombineLatest, new data from each downstream stream is combined with current data from other downstream streams to form new data and forward it from the new stream.
export function combineLatest<O extends ObservableInput<any>, R>(
...observables: O[]
): Observable<R> {
return fromArray(observables).lift(new CombineLatestOperator<ObservedValueOf<O>, R>());
}
export class CombineLatestOperator<T, R> implements Operator<T, R> {
constructor() {}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new CombineLatestSubscriber());
}
}
Copy the code
The start state is the same as zip, and fromArray inputs ObservableInput to CombineLatestSubscriber as data for the upstream stream. Set your eyes on the Subscriber and have a deep understanding of its mental process.
CombineLatestSubscriber
When the data arrives, CombineLatestSubscriber collectively caches the downstream stream into an observables array.
protected _next(observable: any) {
this.values.push(NONE);
this.observables.push(observable);
}
Copy the code
When the downstream stream finishes caching and the upstream stream finishes output, complete is called. Here, all Complete does is subscribe to all downstream streams and record the subscription status of those streams.
protected _complete() { const observables = this.observables; const len = observables.length; if (len === 0) { this.destination.complete(); } else { this.active = len; this.toRespond = len; for (let i = 0; i < len; i++) { const observable = observables[i]; const innerSub = new InnerSubscriber(this, observable, i); this.add(subscribeToResult(this, observable, undefined, undefined, innerSub)); }}}Copy the code
After subscribing to all downstream streams, their data is all streamed to Notify.
notify
It records the number of downstream streams that have not received data. When all downstream streams have data, they begin to combine. Values caches the current data of each downstream flow through the initialized index. When the data of any downstream flow arrives, the cached data of the corresponding index in VALUES will be updated.
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { const values = this.values; const oldVal = values[outerIndex]; let toRespond = 0; If (this.torespond) {// If (this.torespond) {// If (this.torespond) {// If (this.torespond) {// If (this.torespond) {// If (this.torespond) {// If (this.torespond) {// If (this.torespond) {// If (this.torespond) { if (oldVal === NONE) { this.toRespond -= 1; } toRespond = this.toRespond; } values[outerIndex] = innerValue; if (toRespond === 0) { this.destination.next(values.slice()); }}Copy the code
That’s the core of combineLastest’s design.
NotifyComplete handles the relationship between the currently running downstream stream and the termination subscription. When active is reduced to zero, it is necessary to notify the termination of the subscription that all data has been output.
notifyComplete(unused: Subscriber<R>): void { this.active -= 1; if (this.active === 0) { this.destination.complete(); }}Copy the code
forkJoin
ForkJoin is a more radical implementation than combineLatest. The reason why it is radical is that it judges the conditions of merging from downstream flow with data output to downstream flow with data output completed. Its implementation is simple. It simply calculates the number of completed downstream streams that end the output data for each one and compares completed with the total number of downstream streams to determine when to end. Note that forkJoin can only send data if all streams output it.
function forkJoinInternal(sources: ObservableInput<any>[], keys: string[] | null): Observable<any> { return new Observable(subscriber => { const len = sources.length; if (len === 0) { subscriber.complete(); return; } const values = new Array(len); let completed = 0; let emitted = 0; // loop all downstream streams for (let I = 0; i < len; Observable const source = from(sources[I]); // Convert input to Observable const source = from(sources[I]); let hasValue = false; subscriber.add(source.subscribe({ next: value => { if (! hasValue) { hasValue = true; emitted++; Value [I] = value; }, error: err => subscriber. Error (err), // the work to be done when the process is complete: () => {// update the downstream stream subscription completed++; / / determine whether all the downstream flow subscription has completed the if (completed = = = len | |! Emitted by hasValue) {if (emitted data === len) {// If all downstream streams send data, // Terminating a subscription will receive data from all downstream streams. subscriber.next(values); } subscriber.complete(); }}})); }}); }Copy the code
merge & concat
Merge creates merge flows by calling mergeMap, and concat creates the same merge flows by mergeMap. This will be covered in the next chapter. The only difference between them is the amount of concurrency. Merge can subscribe to multiple downstream streams concurrently, whereas concat can subscribe to only one downstream stream at a time.
The merge source
type Any = ObservableInput<any>; export function merge<T, R>(... observables: Array<ObservableInput<any> | number>): Observable<R> { let concurrent = Number.POSITIVE_INFINITY; let last: any = observables[observables.length - 1]; if (typeof last === 'number') { concurrent = <number>observables.pop(); } return mergeMap<Any, Any>(x => x, concurrent)(fromArray<any>(observables)); }Copy the code
Concat source
export function concat1<O extends ObservableInput<any>, R>(... observables: Array<O>): Observable<R> { return mergeMap<O, O>(x => x, 1)(of(... observables)); }Copy the code
partition
export function partition<T>( predicate: (value: T, index: number) => boolean, thisArg? : any ): UnaryFunction<Observable<T>, [Observable<T>, Observable<T>]> { return (source: Observable<T>) => [filter(predicate, thisArg)(source), // a not, Filter (not(predicate, thisArg) as any)(source)] as [Observable<T>, Observable<T>]; }Copy the code
This summary
To sum up, this chapter first gives some general rules of Operators, then analyzes Join Creation Operators, and concludes this chapter by summarizing them in one sentence.
-
Zip is a synthetic stream based on the downstream stream with the least amount of data.
-
CombineLatest is based on the downstream flow with the largest amount of data.
-
ForkJoin is a composite flow based on downstream flow completion.
-
Merge & Concat in the next chapter.
-
Partion splits the input stream into two streams.