DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers. Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)
The introduction
This is the third RxJS source code analysis article, using the source version is 6.6.0. The content of this article may be quite extensive, please read patiently. In order to facilitate reading, the relevant code has been trimmed and processed. If there is anything wrong, please correct it.
In RxJS, Creation operators are mainly divided into the following two categories:
-
The Normal Creation Operator that performs the Normal Creation operations.
-
Join Creation Operator that performs complex create operations.
-
Transformation Operators for data mapping
-
Filtering Operators for Filtering
-
Multicasting Operators of the current Observable
-
Error Handling Operators for Handling errors
-
Utility Operators
-
Conditional and Boolean Operators
-
Mathematical and Aggregate Operators
of , empty & throwError
const observableA = of(1); const observableB = empty(); const observableC = throwError(Error('test')); observableA.subscribe({ next: (v) => console.log('A: ' + v), complete: () => console.log('A: complete'); error: (e) => console.log('A: error is ' + e); }); observableB.subscribe({ next: (v) => console.log('B: ' + v), complete: () => console.log('B: complete'); error: (e) => console.log('B: error is ' + e); }); observableC.subscribe({ next: (v) => console.log('C: ' + v), complete: () => console.log('C: complete'), error: (e) => console.log(`C: error is (${e}).`), }); // Prints the result // A: 1 // A: complete // B: complete // C: error is error: testCopy the code
of source code
export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> {
let scheduler = args[args.length - 1] as SchedulerLike;
if (isScheduler(scheduler)) {
args.pop();
return scheduleArray(args as T[], scheduler);
} else {
return fromArray(args as T[]);
}
}
Copy the code
export function fromArray<T>(input: ArrayLike<T>) { return new Observable<T>((subscriber: Subscriber<T>) => {// loop to get data for (let I = 0, len = array.length; i < len && ! subscriber.closed; i++) { subscriber.next(array[i]); } subscriber.complete(); }); }Copy the code
empty source code
export function empty(scheduler?: SchedulerLike) {
if (scheduler) {
return new Observable<never>(subscriber => scheduler.schedule(
() => subscriber.complete()
));
} else {
return new Observable<never>(subscriber => subscriber.complete());
}
}
Copy the code
throwError source code
export function throwError(error: any, scheduler? : SchedulerLike): Observable<never> { if (! scheduler) { return new Observable(subscriber => subscriber.error(error)); } else { return new Observable(subscriber => scheduler.schedule( dispatch, 0, { error, subscriber } )); }} // Here are the states that the scheduler wants to execute. interface DispatchArg { error: any; subscriber: Subscriber<any>; } // Subcriber's error method is finally executed. function dispatch({ error, subscriber }: DispatchArg) { subscriber.error(error); }Copy the code
iif & defer
-
The main role of defer is to delay the generation of the specific Observable, which is a Lazy Observable Factory.
-
Iif narrates the scope of defer, primarily enhancing the imperative semantics of Rx.
let test = false; const observableA = iif( () => test, of(‘1’), of(‘2’), ); const observableB = defer(function() { return test ? of(‘1’) : of(‘2’); });
iif Source Code
export function iif<T = never, F = never>( condition: () => boolean, trueResult: SubscribableOrPromise<T> = EMPTY, falseResult: SubscribableOrPromise<F> = EMPTY ): Observables | F < T > {/ / direct call defer return defer (() = > condition ()? trueResult : falseResult); }Copy the code
defer Source Code
-
Call the factory to get the input data.
-
Calling from converts the data into an Observable
-
Returns the observable subscription.
export function defer<R extends ObservableInput | void>(observableFactory: () => R): Observable<ObservedValueOf> { return new Observable<ObservedValueOf>(subscriber => { let input: R | void; Try {// Call the factory function to get the input data. input = observableFactory(); } catch (err) { subscriber.error(err); return undefined; } // Convert input to Observable via FROM. const source = input ? from(input as ObservableInput<ObservedValueOf>) : empty();
// Return a subscriber to the outside. return source.subscribe(subscriber);Copy the code
}); }
export type ObservedValueOf<OV> = OV extends ObservableInput<infer T> ? T : never;
Copy the code
from
-
Native arrays and Iterable
-
Dom iterator
-
Promise
-
Observable
-
If the input is Observable, call subscribeToObservable.
-
If the input is a native array, call subscribeToArray.
-
If the input is a Promise, call subscribeToPromise.
-
If the input is a generator, call subscribeToIterable.
subscribeToArray
export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => { for (let i = 0, len = array.length; i < len && ! subscriber.closed; i++) { subscriber.next(array[i]); } subscriber.complete(); };Copy the code
subscribeToObservable
export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>) => {
const obs = obj[Symbol_observable]();
if (typeof obs.subscribe !== 'function') {
throw new TypeError('Provided object does not correctly implement Symbol.observable');
} else {
return obs.subscribe(subscriber);
}
};
Copy the code
subscribeToPromise
export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => { promise.then( (value) => { if (! subscriber.closed) { subscriber.next(value); subscriber.complete(); } }, (err: any) => subscriber.error(err) ); return subscriber; };Copy the code
subscribeToIterable
export const subscribeToIterable = <T>(iterable: Iterable<T>) => (subscriber: Subscriber<T>) => {
const iterator = (iterable as any)[Symbol_iterator]();
do {
let item: IteratorResult<T>;
try {
item = iterator.next();
} catch (err) {
subscriber.error(err);
return subscriber;
}
if (item.done) {
subscriber.complete();
break;
}
subscriber.next(item.value);
if (subscriber.closed) {
break;
}
} while (true);
return subscriber;
};
Copy the code
generate
const observable = generate({ initialState: 1, condition: x => x < 5, iterate: x => x + 1, }); observable.subscribe((value) => { console.log(value); }); // Prints the result // 1 // 2/3 // 4Copy the code
-
Judge the end condition. If it is false, the subscription should be completed; otherwise, proceed to the next step.
-
Send data subscriptions to subscribers.
-
Call the iterative method to generate the next set of data and repeat the first step.
Here is the source code:
export function generate<S>(options: GenerateOptions<S>): Observable<S> { const initialState = options.initialState; const condition = options.condition; const iterate = options.iterate; Observable return new Observable<S>(subscriber => {let state = initialState; Try {while (true) {// Judge the end condition if (condition &&! condition(state)) { subscriber.complete(); break; } // Send data to subscriber. Next (state); // Iterate for next set of data state = iterate(state); if (subscriber.closed) { break; } } } catch (err) { subscriber.error(err); } return undefined; }); }Copy the code
Export interface GenerateOptions<S> {// initialState: S; // End condition? : (x: S) => boolean; Iterate: (x: S) => S; }Copy the code
range
export function range(start: number = 0, count? : number): Observable<number> { return new Observable<number>(subscriber => { if (count === undefined) { count = start; start = 0; } for (let index = 0; index < count; ++index) { subscriber.next(start + index); if (subscriber.closed) { break; } } return undefined; }); }Copy the code
fromEvent & fromEventPattern
fromEvent
const clicksA = fromEvent(document, 'click'); const clicksB = fromEvent($(document), 'click'); clicksA.subscribe(x => console.log('A: ', x)); clicksB.subscribe(x => console.log('B: ', x)); // Every time a page is clicked, an event is printed.Copy the code
export interface HasEventTargetAddRemove<E> { addEventListener(type: string, listener: ((evt: E) => void) | null, options? : boolean | AddEventListenerOptions): void; removeEventListener(type: string, listener? : ((evt: E) => void) | null, options? : EventListenerOptions | boolean): void; } // fromEvent export function fromEvent<T>(target: HasEventTargetAddRemove<T>, eventName: string, options? Observable<T> {return new Observable<T>(subscriber => {// const handler = (... e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); // Call addEventListener and let it be processed in handler. target.addEventListener(eventName, handler, options); When unsubscribing, call removeEventListener directly to dom to unsubscribe. // Returns a function that takes care of what happens when the subscription is unsubscribed. return () => { target.removeEventListener(eventName, handler, options); }; }); }Copy the code
-
Create a handler function in the closure that eventually calls subscriber.next.
-
Adds the specified event listener for target.
-
Add a logic to destroy target event listening for subscriber.
fromEventPattern
function addClickHandler(handler) { document.addEventListener('click', handler); } function removeClickHandler(handler) { document.removeEventListener('click', handler); } const clicks = fromEventPattern( addClickHandler, removeClickHandler ); clicks.subscribe(x => console.log(x)); // When clicked, the click event is output.Copy the code
export type NodeEventHandler = (... args: any[]) => void; export function fromEventPattern<T>( addHandler: (handler: NodeEventHandler) => any, removeHandler? : (handler: NodeEventHandler, signal? : any) => void, ): Observable<T | T[]> { return new Observable<T | T[]>(subscriber => { const handler = (... e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); // A little bit different is that the return value of addHandler let retValue: any; try { retValue = addHandler(handler); } catch (err) { subscriber.error(err); return undefined; } if (! isFunction(removeHandler)) { return undefined; } return () => removeHandler(handler, retValue); }); }Copy the code
interval Source Code
export function interval(period = 0): Observable<number> { if (! isNumeric(period) || period < 0) { period = 0; } const scheduler = async; Return New Observable<number>(subscriber => {// The subscriber receives scheduler subscriptions. subscriber.add( scheduler.schedule(dispatch, period, { subscriber, counter: 0, period }) ); return subscriber; }); }Copy the code
export function interval( period = 0, scheduler: SchedulerLike = async, ): Observable<number> { if (! isNumeric(period) || period < 0) { period = 0; } return timer(period, period, sch); }Copy the code
This summary
Join us
ZCX (官 码 : Coder)
Multicast Observable (RxJS
Previous articles are recommended
RxJS: The Muticasted Observable
Dark Mode and Thematic Development of Web Interfaces
“How to build a grayscale Publishing environment”