DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers. Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)

The introduction

This is the third RxJS source code analysis article, using the source version is 6.6.0. The content of this article may be quite extensive, please read patiently. In order to facilitate reading, the relevant code has been trimmed and processed. If there is anything wrong, please correct it.

In RxJS, Creation operators are mainly divided into the following two categories:

  • The Normal Creation Operator that performs the Normal Creation operations.
  • Join Creation Operator that performs complex create operations.
Operators used in PIPES, which I call Pipe operators, fall into the following categories:

  • Transformation Operators for data mapping
  • Filtering Operators for Filtering
  • Multicasting Operators of the current Observable
  • Error Handling Operators for Handling errors
  • Utility Operators
  • Conditional and Boolean Operators
  • Mathematical and Aggregate Operators
This article introduces the Normal Creation Operator, which helps developers quickly create observables.

of , empty & throwError

Of, Empty, and throwError. The important reason to start with these three operators is that they provide a basic set of operations: Next, complete, and error.

const observableA = of(1); const observableB = empty(); const observableC = throwError(Error('test')); observableA.subscribe({ next: (v) => console.log('A: ' + v), complete: () => console.log('A: complete'); error: (e) => console.log('A: error is ' + e); }); observableB.subscribe({ next: (v) => console.log('B: ' + v), complete: () => console.log('B: complete'); error: (e) => console.log('B: error is ' + e); }); observableC.subscribe({ next: (v) => console.log('C: ' + v), complete: () => console.log('C: complete'), error: (e) => console.log(`C: error is (${e}).`), }); // Prints the result // A: 1 // A: complete // B: complete // C: error is error: testCopy the code

of source code

It is constructed as follows, with the scheduler as the last parameter.

export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> {
  let scheduler = args[args.length - 1] as SchedulerLike;
  if (isScheduler(scheduler)) {
    args.pop();
    return scheduleArray(args as T[], scheduler);
  } else {
    return fromArray(args as T[]);
  }
}
Copy the code
Of consists of two functions fromArray and scheduleArray. FromArray is a simple looping function that loops data to Observable subscribers.

export function fromArray<T>(input: ArrayLike<T>) { return new Observable<T>((subscriber: Subscriber<T>) => {// loop to get data for (let I = 0, len = array.length; i < len && ! subscriber.closed; i++) { subscriber.next(array[i]); } subscriber.complete(); }); }Copy the code

empty source code

This part of the code is simple and the Scheduler part can be ignored. You essentially call subscriber.complete() in Observable.

export function empty(scheduler?: SchedulerLike) {
  if (scheduler) {
    return new Observable<never>(subscriber => scheduler.schedule(
      () => subscriber.complete()
    ));
  } else {
    return new Observable<never>(subscriber => subscriber.complete());
  }
}
Copy the code

throwError source code

ThrowError is the same implementation as Empty, except that complete is replaced with error.

export function throwError(error: any, scheduler? : SchedulerLike): Observable<never> { if (! scheduler) { return new Observable(subscriber => subscriber.error(error)); } else { return new Observable(subscriber => scheduler.schedule( dispatch, 0, { error, subscriber } )); }} // Here are the states that the scheduler wants to execute. interface DispatchArg { error: any; subscriber: Subscriber<any>; } // Subcriber's error method is finally executed. function dispatch({ error, subscriber }: DispatchArg) { subscriber.error(error); }Copy the code

iif & defer

Iif and defer have the same performance.

  • The main role of defer is to delay the generation of the specific Observable, which is a Lazy Observable Factory.
  • Iif narrates the scope of defer, primarily enhancing the imperative semantics of Rx.

    let test = false; const observableA = iif( () => test, of(‘1’), of(‘2’), ); const observableB = defer(function() { return test ? of(‘1’) : of(‘2’); });

iif Source Code

I was shocked when I saw the source code for iIF.

export function iif<T = never, F = never>( condition: () => boolean, trueResult: SubscribableOrPromise<T> = EMPTY, falseResult: SubscribableOrPromise<F> = EMPTY ): Observables | F < T > {/ / direct call defer return defer (() = > condition ()? trueResult : falseResult); }Copy the code

defer Source Code

Defer is simple in principle: When constructing an Observable, it returns a Subscription in the incoming Subscription function. In the incoming subscription function, then, the process of defer is divided into three steps:

  • Call the factory to get the input data.
  • Calling from converts the data into an Observable
  • Returns the observable subscription.

    export function defer<R extends ObservableInput | void>(observableFactory: () => R): Observable<ObservedValueOf> { return new Observable<ObservedValueOf>(subscriber => { let input: R | void; Try {// Call the factory function to get the input data. input = observableFactory(); } catch (err) { subscriber.error(err); return undefined; } // Convert input to Observable via FROM. const source = input ? from(input as ObservableInput<ObservedValueOf>) : empty();

    // Return a subscriber to the outside. return source.subscribe(subscriber);Copy the code

    }); }

Where ObservedValueOf is defined as follows, the infer of TS is used to derive the specific type of T in ObservableInput\.

export type ObservedValueOf<OV> = OV extends ObservableInput<infer T> ? T : never;
Copy the code

from

From provides a mapping capability to map incoming data to Observables. It can accept the following arguments:

  • Native arrays and Iterable
  • Dom iterator
  • Promise
  • Observable
A little trim, the source code is as follows:

export function from(input: ObservableInput): Observable { return new Observable(subscribeTo(input)); }

It directly creates a new Observable and calls subscribeTo, which processes the input differently according to the input type.

  • If the input is Observable, call subscribeToObservable.
  • If the input is a native array, call subscribeToArray.
  • If the input is a Promise, call subscribeToPromise.
  • If the input is a generator, call subscribeToIterable.

subscribeToArray

If the input is a native array or a data structure that implements an array, then call subscriber.next directly to send all the data in turn to the subscriber.

export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => { for (let i = 0, len = array.length; i < len && ! subscriber.closed; i++) { subscriber.next(array[i]); } subscriber.complete(); };Copy the code

subscribeToObservable

If the input is Obervable, retrieve the Observable via a specific Symbol and subscribe to it.

(Due to the nature of Symbol, many current projects use a fixed Symbol value for a particular data, to verify that the data is type).

export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>) => {
  const obs = obj[Symbol_observable]();
  if (typeof obs.subscribe !== 'function') {
    throw new TypeError('Provided object does not correctly implement Symbol.observable');
  } else {
    return obs.subscribe(subscriber);
  }
};
Copy the code

subscribeToPromise

If the input is a Promise, then the content of the Promise is retrieved through then and sent to the subscriber.

export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => { promise.then( (value) => { if (! subscriber.closed) { subscriber.next(value); subscriber.complete(); } }, (err: any) => subscriber.error(err) ); return subscriber; };Copy the code

subscribeToIterable

Generators, like arrays, send data to subscribers in a loop.

export const subscribeToIterable = <T>(iterable: Iterable<T>) => (subscriber: Subscriber<T>) => {
  const iterator = (iterable as any)[Symbol_iterator]();

  do {
    let item: IteratorResult<T>;
    try {
      item = iterator.next();
    } catch (err) {
      subscriber.error(err);
      return subscriber;
    }
    if (item.done) {
      subscriber.complete();
      break;
    }
    subscriber.next(item.value);
    if (subscriber.closed) {
      break;
    }
  } while (true);

  return subscriber;
};
Copy the code

generate

Generate lets you stream data in a similar way to a for loop. However, I haven’t yet come across a situation where this is a great way to generate streams, so if you do, feel free to talk. In general, I’m used to calling it this way.

const observable = generate({ initialState: 1, condition: x => x < 5, iterate: x => x + 1, }); observable.subscribe((value) => { console.log(value); }); // Prints the result // 1 // 2/3 // 4Copy the code
The original source code contains more parameter judgment, the internal logic comb, is actually divided into three big steps:

  • Judge the end condition. If it is false, the subscription should be completed; otherwise, proceed to the next step.
  • Send data subscriptions to subscribers.
  • Call the iterative method to generate the next set of data and repeat the first step.

Here is the source code:

export function generate<S>(options: GenerateOptions<S>): Observable<S> { const initialState = options.initialState; const condition = options.condition; const iterate = options.iterate; Observable return new Observable<S>(subscriber => {let state = initialState; Try {while (true) {// Judge the end condition if (condition &&! condition(state)) { subscriber.complete(); break; } // Send data to subscriber. Next (state); // Iterate for next set of data state = iterate(state); if (subscriber.closed) { break; } } } catch (err) { subscriber.error(err); } return undefined; }); }Copy the code
GenerateOptions contains three members, initialState, condition and iterate.

Export interface GenerateOptions<S> {// initialState: S; // End condition? : (x: S) => boolean; Iterate: (x: S) => S; }Copy the code

range

Range creates a stream of numbers for a given range. The main thing is to provide a simple semantic function that feeds the subscriber data through a loop.

export function range(start: number = 0, count? : number): Observable<number> { return new Observable<number>(subscriber => { if (count === undefined) { count = start; start = 0; } for (let index = 0; index < count; ++index) { subscriber.next(start + index); if (subscriber.closed) { break; } } return undefined; }); }Copy the code

fromEvent & fromEventPattern

fromEvent

FromEvent is an Observable that encapsulates a series of system events. It can accept NodeJS EventEmitter, DOM EventTarget, jquery-like EventTarget, NodeList, HTMLCollection, etc.

const clicksA = fromEvent(document, 'click'); const clicksB = fromEvent($(document), 'click'); clicksA.subscribe(x => console.log('A: ', x)); clicksB.subscribe(x => console.log('B: ', x)); // Every time a page is clicked, an event is printed.Copy the code
Its implementation is very simple, call its corresponding event listener function according to the object type of target, and then call next to get the output of the subscription. To make it easier to read, I changed fromEvent slightly to only support DOM EventTarget.

export interface HasEventTargetAddRemove<E> { addEventListener(type: string, listener: ((evt: E) => void) | null, options? : boolean | AddEventListenerOptions): void; removeEventListener(type: string, listener? : ((evt: E) => void) | null, options? : EventListenerOptions | boolean): void; } // fromEvent export function fromEvent<T>(target: HasEventTargetAddRemove<T>, eventName: string, options? Observable<T> {return new Observable<T>(subscriber => {// const handler = (... e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); // Call addEventListener and let it be processed in handler. target.addEventListener(eventName, handler, options); When unsubscribing, call removeEventListener directly to dom to unsubscribe. // Returns a function that takes care of what happens when the subscription is unsubscribed. return () => { target.removeEventListener(eventName, handler, options); }; }); }Copy the code
The above code can be broken down into these three steps:

  • Create a handler function in the closure that eventually calls subscriber.next.
  • Adds the specified event listener for target.
  • Add a logic to destroy target event listening for subscriber.
For other event listeners, the process is exactly the same.

fromEventPattern

FromEventPattern is a generalization of fromEvent.

function addClickHandler(handler) { document.addEventListener('click', handler); } function removeClickHandler(handler) { document.removeEventListener('click', handler); } const clicks = fromEventPattern( addClickHandler, removeClickHandler ); clicks.subscribe(x => console.log(x)); // When clicked, the click event is output.Copy the code
Its source code is similar to fromEvent.

export type NodeEventHandler = (... args: any[]) => void; export function fromEventPattern<T>( addHandler: (handler: NodeEventHandler) => any, removeHandler? : (handler: NodeEventHandler, signal? : any) => void, ): Observable<T | T[]> { return new Observable<T | T[]>(subscriber => { const handler = (... e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); // A little bit different is that the return value of addHandler let retValue: any; try { retValue = addHandler(handler); } catch (err) { subscriber.error(err); return undefined; } if (! isFunction(removeHandler)) { return undefined; } return () => removeHandler(handler, retValue); }); }Copy the code

interval Source Code

Here is the source code for Interval.

export function interval(period = 0): Observable<number> { if (! isNumeric(period) || period < 0) { period = 0; } const scheduler = async; Return New Observable<number>(subscriber => {// The subscriber receives scheduler subscriptions. subscriber.add( scheduler.schedule(dispatch, period, { subscriber, counter: 0, period }) ); return subscriber; }); }Copy the code
Looking closely at the code above, I found that the implementation of interval is actually a constrained version of timer that can be rewritten like this.

export function interval( period = 0, scheduler: SchedulerLike = async, ): Observable<number> { if (! isNumeric(period) || period < 0) { period = 0; } return timer(period, period, sch); }Copy the code

This summary

Overall, this part of the source code is not too convoluted, and the logic becomes clear immediately after scheduler-related content is removed. Also, the style of the source code shows that they are written by different people.

Finally, limited to my level, if there are mistakes in the article, welcome to correct.

Join us

We are [DevUI team](http://devui.design/), welcome to come here and build elegant and efficient human-computer design/ research and development system with us. Recruitment email: [[email protected]](mailto:[email protected]).

ZCX (官 码 : Coder)

Multicast Observable (RxJS

Previous articles are recommended

RxJS: The Muticasted Observable

Dark Mode and Thematic Development of Web Interfaces

“How to build a grayscale Publishing environment”