The best way to learn a library is to see its source code, understand its API call principle, use it naturally also very clear what they are doing, uphold this concept, in order to better understand RXJS, take the time to see its source code again

This series of articles is not intended to be conceptual, but will focus on interpreting the source code, based on RXJS V7.4.0, and giving small examples where appropriate

fromnew Observablestart

import { Observable } from 'rxjs'

const observable = new Observable<number> (subscriber= > {
  subscriber.next(1)
  subscriber.next(2)
  subscriber.complete()
})
observable.subscribe({
  next: data= > console.log('next data:', data),
  complete: () = > {
    console.log('complete')}})Copy the code

The output is as follows:

// Start output next data: 1 Next data: 2 completeCopy the code

An Observable is created using the New Observable() method, and then subscribes to the Observable via the Subscribe method. The subscribe method executes the function parameters passed in the new Observable. So what does the New Observable do

// /src/internal/Observable.ts
export class Observable<T> implements Subscribable<T> {
  // ...
  constructor(subscribe? : (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
    if (subscribe) {
      this._subscribe = subscribe; }}// ...
}
Copy the code

Observable initialization is simple by binding the callback to the _subscribe property of the instance

subscribe

Observable implements the Subscribable interface

// /src/internal/types.ts
export interface Subscribable<T> {
  subscribe(observer: Partial<Observer<T>>): Unsubscribable;
}
Copy the code

This subscribe is the next method to subscribe. In the current version, there are three subscribed methods, all of which will be processed into the same object, focusing on the first

subscribe(observer? : Partial<Observer<T>>): Subscription;Copy the code

For the first signature, the parameters received are related to the Observer interface, which has three method attributes

export interface Observer<T> {
  next: (value: T) = > void;
  error: (err: any) = > void;
  complete: () = > void;
}
Copy the code

Subscribe can be an object with three method attributes next, error, and complete. If you don’t care about error and complete, you can just pass in a method with the second function signature. By default, this method represents the next method property

Go into the subscribe method

subscribe( observerOrNext? : Partial<Observer<T>> | ((value: T) = > void) | null, error? : ((error: any) = > void) | null, complete? : (() = > void) | null
): Subscription {
  const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
  errorContext(() = > {
    // ...
  });
  return subscriber;
}
Copy the code

The first parameter of SUBSCRIBE can be a subscriber (with three attributes of next, error and complete, so the type is legal), but this form of parameter transmission is generally used inside the library, and we normally write it as a pure object or method. So that means new SafeSubscriber(observerOrNext, error, complete)

// node_modules/rxjs/src/internal/Subscriber.ts
export class SafeSubscriber<T> extends Subscriber<T> {
  // ...
}
Copy the code

SafeSubscriber inherits Subscriber and mainly encapsulates the attributes of the next, error and complete methods to ensure better error processing

subscriber.add(
  operator
    ? // We're dealing with a subscription in the
      // operator chain to one of our lifted operators.
      operator.call(subscriber, source)
    : source
    ? // If `source` has a value, but `operator` does not, something that
      // had intimate knowledge of our API, like our `Subject`, must have
      // set it. We're going to just call `_subscribe` directly.
      this._subscribe(subscriber)
    : // In all other cases, we're likely wrapping a user-provided initializer
      // function, so we need to catch errors and handle them appropriately.
      this._trySubscribe(subscriber)
);
Copy the code

ErrorContext is also a wraparound method for error handling that calls only a subscriber.add method with two nested ternary expressions for its arguments.

RXJS’s built-in operators call Observable. In this case, this.operator has a value, so if called by an operator, operator.call(subscriber, source) is called. Some subjects within RXJS will execute to the second logic this._subscribe(subscriber) in certain cases; The other cases (normally used by the developer) will execute this._trysubscribe (subscriber), the first two involve operator and Subject, and the final process is almost the same as executing the third one directly, so we’ll just look at the third one here

This. _subscribe is the argument passed in to the initial New Observable, so this method is executed whenever a subscribe operation occurs

protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
  try {
    return this._subscribe(sink);
  } catch (err) {
    // We don't need to return anything in this case,
    // because it's just going to try to `add()` to a subscription
    // above.sink.error(err); }}Copy the code

In this example, subscriber.next and subscriber.complete are called in the function parameters of the New Observable

protected _next(value: T): void {
  this.destination.next(value);
}
protected _error(err: any) :void {
  try {
    this.destination.error(err);
  } finally {
    this.unsubscribe(); }}protected _complete(): void {
  try {
    this.destination.complete();
  } finally {
    this.unsubscribe(); }}Copy the code

The this.destination object is set to next, Error, and complete in the new SafeSubscriber. The three custom methods passed in when subscribing are called here

// The simplified code
subscriber.add(this._trySubscribe(subscriber));
Copy the code

This collects teardown, the cleanup method that happens when a subscription is unsubscribed. For example, if you start a polling method in a subscription, and you want to unsubscribe the polling logic as well, you can do this in the body of a New Observable. The teardown method is automatically called when unsubscribe to execute the unpolling logic you defined, similar to the react. useEffect method that was finally returned

add(teardown: TeardownLogic): void {
  // Only add the teardown if it's not undefined
  // and don't add a subscription to itself.
  if(teardown && teardown ! = =this) {
    if (this.closed) {
      // If this subscription is already closed,
      // execute whatever teardown is handed to it automatically.
      execTeardown(teardown);
    } else {
      if (teardown instanceof Subscription) {
        // We don't add closed subscriptions, and we don't add the same subscription
        // twice. Subscription unsubscribe is idempotent.
        if (teardown.closed || teardown._hasParent(this)) {
          return;
        }
        teardown._addParent(this);
      }
      (this._teardowns = this._teardowns ?? []).push(teardown); }}}Copy the code

The value this.closed identifies whether the subscription is currently unsubscribed (set to true for complete, error, and unsubscribe), This._teardowns is used to store all teardowns associated with the current subscription. As you can see, teardown can be a custom clean-up method as well as a subscription

One subscription (called parent) can be connected to another (called child) by adding, When the parent subscription calls the unsubscribe method, all the methods in this._teardowns are executed, and the child subscription is called unsubscribe. Unsubscribe all descendants subscription

The relationship looks like a parent-child relationship, so the private _parentage attribute is used to avoid B subscription being repeated by the same subscription. Subscription defines several methods to manage parentage data, such as _hasParent, _addParent, and _removeParent

const observable1 = interval(100)
const observable2 = interval(200)

const subscription1 = observable1.subscribe(x= > console.log('first: ' + x))
const subscription2 = observable2.subscribe(x= > console.log('second: ' + x))

subscription2.add(subscription1)
setTimeout(() = > {
 subscription2.unsubscribe()
}, 400)
Copy the code

In the above code, subscription2 is connected to Subscription1 through the add method, so when subscription2 calls unsubscribe, it will execute the unsubscribe of subscription1 at the same time. So the output is

// Start output first: 0 first: 1 second: 0 first: 2 first: 3 second: 1Copy the code

unsubscribe

Either subscribe or unsubscribe, unsubscribe is mainly used to perform some cleaning actions, such as executing the teardown collected during subscribe and updating _parentage data

// node_modules/rxjs/src/internal/Subscription.ts
unsubscribe(): void {
  // ...
  const { _parentage } = this;
  if (_parentage) {
    / / update the _parentage
  }

  const { initialTeardown } = this;
  if (isFunction(initialTeardown)) {
    / / initialTeardown execution
  }

  const { _teardowns } = this;
  if (_teardowns) {
    // ...
    / / execution teardown
  }
  // ...
}
Copy the code

There is an initialTeardown method that can be understood as a function that is executed when Subscription is unsubscribed. This is not something you need to care about as a user; it is used in the library

const subscription = new Subscription(() = > {
  console.log('Perform initialTeardown when unsubscribe')})const observable = new Observable<number> (subscribe= > {
  subscribe.next(1)
  return subscription
})
const subscription1 = observable.subscribe(d= > console.log(d))
subscription1.unsubscribe()
// Start output
/ / 1
InitialTeardown is executed when you unsubscribe
// End the output
Copy the code

At this point, the source logic from the example at the beginning of this article is done, as is Subscription, and back to Observable where it doesn’t

lift

lift<R>(operator? : Operator<T, R>): Observable<R> {const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}
Copy the code

Lift returns a new Observable via a new Observable and tags source and operator to facilitate chain operations. In current versions, developers are not advised to call this method directly. It is used by the numerous operators inside RXJS

forEach

forEach(next: (value: T) = > void, promiseCtor? : PromiseConstructorLike):Promise<void> {
  promiseCtor = getPromiseCtor(promiseCtor);

  return new promiseCtor<void> ((resolve, reject) = > {
    // Must be declared in a separate statement to avoid a ReferenceError when
    // accessing subscription below in the closure due to Temporal Dead Zone.
    let subscription: Subscription;
    subscription = this.subscribe(
      (value) = > {
        try {
          next(value);
        } catch(err) { reject(err); subscription? .unsubscribe(); } }, reject, resolve ); })as Promise<void>;
}
Copy the code

GetPromiseCtor is a JS Promise called this.subscribe

subscribe(next? : ((value: T) = > void) | null, error? : ((error: any) = > void) | null, complete? : (() = > void) | null): Subscription;
Copy the code

The three arguments to subscribe correspond to next, error, and complete. Next will continue to be called until complete is executed, and the promise is finished. So if you want to use this method, you have to make sure that the Observable you’re using eventually calls complete. Otherwise, that means the promise never ends and the forEach stays hung up

In general, we won’t use this method, because many scenarios that require forEach can be replaced by operators, such as the one given in the forEach source code

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));
async function getTotal() {
   let total = 0;
   await source$.forEach(value= > {
     total += value;
     console.log('observable -> ', value);
   });
   return total;
}
getTotal().then(
  total= > console.log('Total:', total)
)
Copy the code

This is more intuitive if you use the reduce operator

import { interval } from 'rxjs';
import { reduce } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(4));
source$.pipe(
  reduce((acc, value) = > {
    console.log('observable -> ', value);
    return acc + value;
  }, 0)
).subscribe(total= > console.log('Total:', total));
Copy the code

pipe

Pipe has a large number of type signatures to aid in automatic inference of types. As long as pipe passes in nine or more parameters, the type can be correctly inferred. Once pipe passes in more than nine parameters, the automatic inference fails and the user must specify the type himself

// node_modules/rxjs/src/internal/Observable.tspipe(... operations: OperatorFunction<any.any>[]): Observable<any> {
  return pipeFromArray(operations)(this);
}

// node_modules/rxjs/src/internal/util/identity.ts
export function identity<T> (x: T) :T {
  return x;
}

// node_modules/rxjs/src/internal/util/pipe.ts
/ * *@internal * /
export function pipeFromArray<T.R> (fns: Array<UnaryFunction<T, R>>) :UnaryFunction<T.R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any.any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T) :R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) = > fn(prev), input as any);
  };
}
Copy the code

Pipe calls pipeFromArray, whose arguments, FNS, are all the arguments passed into pipe, which is the operator

Returns an Observable if no operator methods are passed in. If only to a operator method, the direct return to the operator method, otherwise returns a function that will be implemented in the function body through the reduce method in all of the operators, the perform reasoning is to an operator method returns the value of the parameters as the next operator, is like a pipe connect all of the operators, This borrows from the idea of functional programming by combining functions through a PIPE function, where the output of the previous function becomes the input parameter of the next function

Finally, no matter how many operators we pass in, we return an Observable instance, so we can call subscribe

toPromise

// node_modules/rxjs/src/internal/Observable.tstoPromise(promiseCtor? : PromiseConstructorLike):Promise<T | undefined> {
  promiseCtor = getPromiseCtor(promiseCtor);

  return new promiseCtor((resolve, reject) = > {
    let value: T | undefined;
    this.subscribe(
      (x: T) = > (value = x),
      (err: any) = > reject(err),
      () = > resolve(value)
    );
  }) as Promise<T | undefined>;
}
Copy the code

The toPromise method is similar to the forEach implementation mentioned above. It converts an Observable into a Promise and returns the Observable’s last value on. Then. This method has been marked deprecated and will be removed in V8.x, and the author suggests in the source code comments that we use firstValueFrom and lastValueFrom instead

const source$ = interval(100).pipe(take(4))
source$.toPromise().then(total= > console.log(total))

/ / equivalent to
const source$ = interval(100).pipe(take(4))
lastValueFrom(source$).then(total= > console.log(total))

/ / output
/ / 3
Copy the code

The usage doesn’t seem very different, and in fact the implementation of lastValueFrom is similar to toPromise, but the method name is clearly more semantic

// node_modules/rxjs/src/internal/lastValueFrom.ts
export function lastValueFrom<T.D> (source: Observable
       
        , config? : LastValueFromConfig
        
       ) :Promise<T | D> {
  const hasConfig = typeof config === 'object';
  return new Promise<T | D>((resolve, reject) = > {
    let _hasValue = false;
    let _value: T;
    source.subscribe({
      next: (value) = > {
        _value = value;
        _hasValue = true;
      },
      error: reject,
      complete: () = > {
        if (_hasValue) {
          resolve(_value);
        } else if(hasConfig) { resolve(config! .defaultValue); }else {
          reject(newEmptyError()); }}}); }); }Copy the code

summary

Observable/Subscription code is very simple, there is no logic to it, and there is a lot of detail in the official code (including examples). These are just two of the most basic concepts in the RXJS system, and are not normally used when using RXJS; Subject and Operators are regular users