What is the observables

concept

The RxJS document describes the Observable definition as follows:

Observables are lazy Push collections of multiple values.

I don’t know what they’re talking about, but it doesn’t matter. Let’s take a look at them one by one: First, the most confusing concept is Push.

In Push systems, the Producer determines when to send data to the Consumer. The Consumer is unaware of when it will receive that data.

In JavaScript, promises are a typical push system, and we don’t know when the Promise object will return the value we need (resovled). We can only register the callback function to wait for the “future value”. Observable is also a push system. The opposite of, of course, is Pull:

In Pull systems, the Consumer determines when it receives data from the data Producer. The Producer itself is unaware of when the data will be delivered to the Consumer.

In JavaScript, every function is a Pull system. Lazy, an Observable is a Push system, but that doesn’t mean it “actively” pushes data to us. Only when we subscribe to an Observable can we get the data we need. Here’s an example:

import { Observable } from 'rxjs';

// Create a lazy Push System
const observable = new Observable(subscriber= > {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
});

// Subscribe the lazy Push System
observable.subscribe({
    next: (value: number) = > console.log('we got', value);
    error: (error: any) = > console.error(error);
    complete: () = > console.log('completed');
});

/** * Output: * we got 1 * we got 2 * we got 3 * completed */
Copy the code

The principle of

Those of you who are familiar with the subscriber pattern will also be familiar with this code. Of course, our topic is not to explain the subscriber pattern, but to learn the source code of RxJS. So how does RxJS implement this logic? Initially, we create an object instance Observable using the new operator. Its source code constructor looks like this:

export class Observable<T> implements Subscribable<T> {
    / * *... * /
    
    constructor(subscribe? : (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
        if (subscribe) {
            this._subscribe = subscribe; }}protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
        return this.source? .subscribe(subscriber); }/ * *... * /
}
Copy the code

As you can see, when the operator new is used to instantiate an Observable, its incoming argument overrides the internal method _subscribe. If an object is instantiated with an empty argument, the internal method always returns undefined because the internal source property starts with undefined. So, assuming we can see the value inside the observable variable, then:

observable._subscribe = (subsciber: Subscriber<number>) = > { 
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
};
Copy the code

We don’t worry about the realization principle of Subscriber and continue to learn the realization of the subscribe Observable, whose source code is as follows:

export class Observable<T> implements Subscribable<T> {
    / * *... * /
    constructor(subscribe? : (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
        if (subscribe) {
            this._subscribe = subscribe; }}protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
        return this.source? .subscribe(subscriber); } subscribe( observerOrNext? : Partial<Observer<T>> | ((value: T) = > void) | null, error? : ((error: any) = > void) | null, complete? : (() = > void) | null
    ): Subscription {
        const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
        if (config.useDeprecatedSynchronousErrorHandling) {
            this._deprecatedSyncErrorSubscribe(subscriber);
        } else {
            const { operator, source } = this;
            subscriber.add(
                operator
                    ? // We're dealing with a subscription in the
                      // operator chain to one of our lifted operators.
                      operator.call(subscriber, source)
                    : source
                    ? // If `source` has a value, but `operator` does not, something that
                      // had intimate knowledge of our API, like our `Subject`, must have
                      // set it. We're going to just call `_subscribe` directly.
                      this._subscribe(subscriber)
                    : // In all other cases, we're likely wrapping a user-provided initializer
                      // function, so we need to catch errors and handle them appropriately.
                      this._trySubscribe(subscriber)
                );
        }
        return subscriber;
    }
    / * *... * /
}
Copy the code

The subscibe method of Observable accepts three optional parameters, as shown in the example above. Obviously, error and complete are not passed. ObserverOrNext has three types, including an Observer

object and a function (value: T) => void and null. For Observer

, the RxJS interface definition is:

export interface Observer<T> {
    next: (value: T) = > void;
    error: (err: any) = > void;
    complete: () = > void;
}
Copy the code

Thus, the argument we pass in is an Observer

object. How does the code determine that? First, the code determines if there is a Subscriber instance through the function isSubcriber:

function isSubscriber<T> (value: any) :value is Subscriber<T> {
    return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value));
}

function isObserver<T> (value: any) :value is Observer<T> {
    return value && isFunction(value.next) && isFunction(value.error) && isFunction(value.complete);
}

function isSubscription(value: any) :value is Subscription {
    return (
        value instanceof Subscription ||
        (value && 'closed' in value && isFunction(value.remove) && isFunction(value.add) && isFunction(value.unsubscribe))
    );
}
Copy the code

Obviously, a Subscriber

object containing only next, error, and complete functions is not a Subscriber

object. RxJS creates a SafeSubscriber object based on this and calls the _trySubscribe method. We also call this._subscribe essentially:

protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
    try {
        return this._subscribe(sink);
    } catch (err) {
        // We don't need to return anything in this case,
        // because it's just going to try to `add()` to a subscription
        // above.sink.error(err); }}Copy the code

As mentioned earlier, this._subscribe is overwritten as an argument passed in when the object is instantiated, so we create and subscribe to the essence of the observable variable as follows:

// pseudo code

/ / create observables
observable._subscribe = (subsciber: Subscriber<number>) = > { 
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
};

/ / create the Subscriber
subscriber = {
    next: (value: number) = > console.log('we got', value);
    error: (error: any) = > console.error(error);
    complete: () = > console.log('completed');
}

/ / subscribe to observables
observable._subscribe(subscriber)
Copy the code

The instant sensation of full subscriber mode! As for the content of subscribers, we first learn this, and then we will further study the content related to subscribers. Back to the main idea, all about Observables!

Observable what else

.pipe()

Pipe () will be familiar to those familiar with routine programming with RxJS. I first learned about the concept of PIPE as a pipe function in Linux programming:

A pipe is a mechanism for connecting standard input and standard output between two processes, providing a means for multiple processes to communicate.

In JavaScript, Functional Programming takes this idea into account by combining functions with a pipe function, in which the output of the previous function becomes the input parameter of the next function. The general implementation looks like this:

constpipe = ... args =>x= > 
  args.reduce(
    (outputValue, currentFunction) = > currentFunction(outputValue),
    x
  );
Copy the code

The RxJS source code for pipe() is more concise:

export class Observable<T> implements Subscribable<T> {
    / * *... * /pipe(... operations: OperatorFunction<any.any>[]): Observable<any> {
        return operations.length ? pipeFromArray(operations)(this) : this;
    }
    / * *... * /
}

function identity<T> (x: T) :T {
    return x;
}

function pipeFromArray<T.R> (fns: Array<UnaryFunction<T, R>>) :UnaryFunction<T.R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any.any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T) :R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) = > fn(prev), input as any);
  };
}
Copy the code

First pipe() needs to pass in 0 to N operator functions, one by one, depending on the number of arguments, or return an Observable directly. The pipeFromArray

function, which takes an array of unit functions as an argument, has the following interface:
,>

export interface UnaryFunction<T, R> {
  (source: T): R;
}
Copy the code

It takes a parameter of type T and outputs a value of type R, which is Scala and functional. If FNS is null, an identity function is returned so that pipe returns Observable directly. If it contains only one element, the element is returned directly. Otherwise, a proper PIPE function is returned, essentially in the same form as the PIPE function in functional programming written above.

.forEach()

Some people think that forEach method is a simplification of subscribe method, combined with the source code, is reasonable. Look directly at the source code:

export class Observable<T> implements Subscribable<T> {
    / * *... * /
    forEach(next: (value: T) = > void, promiseCtor? : PromiseConstructorLike):Promise<void> {
        promiseCtor = getPromiseCtor(promiseCtor);
        return new promiseCtor<void> ((resolve, reject) = > {
            // Must be declared in a separate statement to avoid a ReferenceError when
            // accessing subscription below in the closure due to Temporal Dead Zone.
            let subscription: Subscription;

            subscription = this.subscribe(
                (value) = > {
                    try {
                        next(value);
                    } catch(err) { reject(err); subscription? .unsubscribe(); } }, reject, resolve ); })as Promise<void>;
    }
    / *... * /
}
Copy the code

Here, we don’t need to focus too much on the getPromiseCtor object and just think of it as a Promise object in JavaScript. As you can see, the forEach method returns a Promise object and internally subscribes to an Observable, Reject and resolve functions are used to deal with subscriber.error and subscriber.complete respectively. It can be seen that when we know exactly what values will be contained in the Observable to be processed and when to end Push, we can directly call the forEach method after all the internal values are pushed. ForEach is a simplification of subscribe.

.toPromise()

It can be seen from the above that, in essence, forEach transforms an Observable into a Promise object, which directly processes the result of pushing the internal value of Observable. The toPromise method also converts an Observable into a Promise, and the source code is very similar:

export class Observable<T> implements Subscribable<T> {
    / * *... * /toPromise(promiseCtor? : PromiseConstructorLike):Promise<T | undefined> {
        promiseCtor = getPromiseCtor(promiseCtor);
        return new promiseCtor((resolve, reject) = > {
            let value: T | undefined;
            this.subscribe(
                (x: T) = > (value = x),
                (err: any) = > reject(err),
                () = > resolve(value)
            );
        }) as Promise<T | undefined>;
    }
    / *... * /
}
Copy the code

It is worth noting that this method has been deprecated in RxJS 5.5+, and it is officially recommended to use two other functions in its place, including firstValueFrom and lastValueFrom, whose source code we will learn more about later.

The next step?

We have basically finished learning about Observable object in RxJS, but careful students must have noticed that the previous content about Subscriber was directly skipped. Next, we’ll focus on Subscriber and Subscription to thoroughly understand the core concept of Observable. Back to more complex things about Observables.

The above!