The original address

mini RxJs

Ha ha headline party a wave 😝

D2 Front-end Summit – Wujie was held in Hangzhou a few weeks ago, inviting a group of bigwigs to share technology. Among them there are a few foreign big guy online lu code, looking at extremely satisfying

Ben Lesh, as the maintainer of RxJs, shared his next generation of refactoring process, which aroused my great interest, so I tried to implement a mini version myself

introduce

1. Basic Concepts

  1. Functional Programming is a Programming paradigm in which functions, as “first-class citizens”, are programmed in a structured way to better declare and manage tasks without further details

  2. Responsive programming is proposed on the basis of functional programming, which combines observer mode and iterator mode to manage sequential tasks. Quote from RxJs docs

Second,Callbag

  1. Ben Lesh mentioned that the original inspiration for RxJs came from Callbag, so I read the RxJx source code through Callbag first

  2. Callbag is a lightweight, responsive programming standard, not an implementation. Of course, this guy implemented Callbag-basics according to this standard

  3. It mentions “observables” and “observers”, as well as the important concept of communication between the two. Of course, we can combine the two with the summary of Rxjs official website:

3. Core Concepts

  1. ObservableObservable objects, there arePullablePushableThe points of
  • Pullable Pullable Observable that is pulled from the Observable each time the observer needs data

    • Each JavaScript Function can be seen as a pull-out pattern to help you understand this concept. Think about how we call JS functions

    • An observer is active, an Observable is passive, so you don’t know when the observer requests data

  • Pushable is a Pushable Observable that actively pushes data to observers who subscribe to it

    • You can use the Promise + Then pattern. Imagine calling resolve, reject in the Promise constructor and passing the result to subsequent Then functions

    • The Observable is active, and the observer is passive. You don’t know when the Observable is pushing data

  1. An Observer subscribes to an Observable, which passes its own callback to the Observable, which you can interpret as communication

  2. Operators set of instructions that enable us to operate and control the process in a structured way. Common ones are:

  • Creation Operators, such as From, of, interval, create an Observer

  • Transformation Operators, such as Map and Pluck, operate Observer

  • Filtering Operators such as Filter, Take, and Filter Observer

Implemented in Callbag

Before you start RxJs, take a look at Callbag

A sink,

  1. A basic constituent atom that can be used as an observable (source in this case) or a listener

  2. Sink is a function that takes two input parameters, type and payload, and returns void

  • There are only three types of type:

    • 0: start, the communication starts

    • 1: data, data transmission

    • 2: end: ends the communication

  • Payload, which can transmit specific data or sink

  1. To establish a connection, all sinks (source or listener) must first communicate with the other party with type 0, that is, sink1 sends type 0 to sink2, Sink2 to establish communication with sink1 also has to talkback type 0 once, this process is called handleshake

  2. After type 0, you can use type 1 to pass specific payload

  3. Either party can use type 2 to tell the other party that it is ready to end the communication without talkback

Second, the source

  1. How to use it: Do we want to process an array with the FROM and map operator and iterate over the printed results
const source = pipe(
  from([3.5.8]),
  map((n, i) = > n * 2 + "-map-" + i)
);

each((n, i) = > console.log(n, i))(source);

/ / logs:
// 6-map-0 0
// 10-map-1 1
// 16-map-2 2
Copy the code
  1. From: to create iterable source

If the listener sink type 0 is not received, the source sink will not be used by nuan, no matter pullable or pushable

const START = 0;
const DATA = 1;
const END = 2;

// take an iter as an input parameter and return the source sink
// This source sink accepts a second parameter, payload
// The payload is a sink, so you have to give the from operator a chance to send data to the from operator
const from = (iter) = > (type, sink) = > {
  // If the listener does not pass 0 first, source does not use nuan
  if(type ! == START)return;

  // A more general approach is to use iterator
  if (Array.isArray(iter)) {
    const len = iter.length;
    let inLoop = true;
    let i = 0;

    // Data is ready, since it is sink, it needs to establish communication first
    sink(START, (t) = > {
      if (i === len) return;
      // Wait for type 1 to send data
      if (t === DATA) {
        sink(DATA, { v: iter[i], i: i++, o: iter });
        if (i === len && inLoop) {
          inLoop = false;
          // Disconnect communication traversedsink(END); }}// The listener disconnects the connection
      if (t === END) sink(END);
    });
  }

  // if (toString.call(source) === "[object Object]") {}
};
Copy the code
  1. map operator: to transform source
// Accept user-level callback and this, which are familiar
// Then accept the first source sink (iter parameter of from) as the pullable data source
The second parameter of this sink is a sink, for the same reason as from. You have to let me pass out the processed data
const map = (callback, thisArg) = > (source) = > (type, sink) = > {
  if(type ! == START)return;
  let i = 0;

  // When the data is ready, establish communication with source
  source(START, (t, d) = > {
    // Wait for type 1 to arrive, execute the user-level callback and pass out
    sink(t, t === DATA ? callback.call(thisArg, d.v, i++, d.o) : d);
  });
};
Copy the code

Third, the listener

Ok, so reading this we haven’t actually used type 1 as an input, right? We’re just using type 1 as a judgment. Just masturbate a listener and it’s all strung together

  1. each: to consume source
Callback and this are accepted at the user level
// Communicate with source directly instead of returning to sink like from and map. Because you don't have to give it away
const each = (callback, thisArg) = > (source) = > {
  let pullable;
  let i = 0;

  // Establish communication
  source(START, (t, d) = > {
    // source Sends the internal sink to the listener after receiving the listener type 0
    if (t === START) pullable = d;
    // Execute the user logic
    if (t === DATA) callback.call(thisArg, d, i++);
    // Data traversal is complete
    if (t === END) pullable = null;

    // Start consuming source after receiving 0 or 1
    if(t ! == END) pullable(DATA); }); };Copy the code

Fourth, the pipe

One line of code, the first argument accepts a source sink, passed to each operator

const pipe = (source, ... callbacks) = > callbacks.reduce((prev, cb) = > cb(prev), source);
Copy the code

So let’s go back and look at some examples or run the demo

RxJs

Let’s get started on the main course when we’re done

1. Upgrade Callbag

The idea delivered by Callbag is fascinating, but it is still not convenient to use, why

  1. I (Library Developer) had to deal with zeros and ones and twos all the time, and it was uncomfortable

  2. Parameter sink embedded sink increases a lot of understanding cost

  3. This functional programming code organization is not comfortable enough for me (user), not elegant enough, I want to chain calls

  4. I also encounter some of the above problems if I extend my own operator

// Write it like this
from([3.5.8]).pipe(
  map((n, i) = > n * 2 + "-map-" + i),
  filter(...)
).subscribe((n) = > console.log(n));
Copy the code

Second, the observables

Observable becomes the basic atom of RxJs, combining “data generation”, “pipeable”, and “chain awareness” into one, allowing us to write comfortably, just like Promise

  1. The basic frame
interface Observer<T> {
  next: (value: T, index? : number) = > void;
  error: (err: any) = > void;
  complete: () = > void;
}

type Producer<T> = (subscriber: Subscriber<T>) = > void;

class Observable<T> {
  // The constructor accepts a data producer, which accepts subscriber as an input parameter
  // At the user level, the user can operate the next or error method of subscriber to generate data
  constructor(public producer: Producer<T> = noop) {}

  Subscribe is passed to next, error, etc
  subscribe(next, error, complete) {
    const subscriber = new Subscriber({ next, error, complete });
    this.producer(subscriber);
    return subscriber;
  }

  // pipe passes operators directlypipe(... operators): Observable {return operators.reduce((prev, operator) = > operator(this), this); }}Copy the code
  1. Therefore, we can see that only when the user calls subscribe actively, we will new Subscriber and execute the producer. This is especially important to remember
const subscription = new Observable((subscriber) = > {
  subscriber.next(1);
  subscriber.complete();
}).subscribe(
  (n) = > console.log("next: get num", n),
  (err) = > console.log("error:", err),
  () = > console.log("complete"); ;Copy the code

Third, the Subscriber

  1. Observer and Subscription are introduced in RxJs for types 0, 1, and 2, which includes methods next, error, and complete

  2. The corresponding relationship is:

type 0 <-> observable.subscribe

type 1 <-> observer.next

type 2 <-> subscription.unsubscribe
Copy the code
  1. Subscriber is the concrete implementation of Observer, which encapsulates the three methods
interface Observer<T> {
  next: (value: T, index? : number) = > void;
  error: (err: any) = > void;
  complete: () = > void;
}

class Subscriber<T> implements Observer<T> {
  public isClosed = false;
  protected destination: Observer<T>;

  constructor(destination: Partial<Observer<T>>) {
    this.destination = { ... defaultDestination, ... destination }; }next(value: T, index? : number) {
    if (this.isClosed) return;
    this.destination.next(value, index);
  }

  error(err: any) {
    if (this.isClosed) return;
    this.isClosed = true;
    this.destination.error(err);
    this.unsubscribe();
  }

  complete() {
    if (this.isClosed) return;
    this.isClosed = true;
    this.destination.complete();
    this.unsubscribe();
  }

  unsubscribe() {
    this.isClosed = true;
    this.destination = defaultDestination; }}Copy the code

Observable lift!

And of course it won’t work, because I’ve marked it with the emphasis. You’re not thinking about what you went through when you stroked the Promise, right, in human language, okay, so the chain call is going to provide a new Promise instance

  1. Observable. Pipe also provides a new Observable instance

  2. But for Rxjs, the difficulty lies in this point. Why, as indicated before, is that when the user actively calls Observable.subscribe (of course, if it is a chain call, it is the one generated at the end), we will new Subscriber and execute the producer. This is a lazy call process

  3. In addition, one Observable corresponds to one subscriber, and other Observables cannot call others’ subscriber

  4. The problem is that we only pass producer in the constructor of the first New Observable, but the constructor of the last Observable returned is invisible to us

  5. RxJx has also done a lot of work on this, including the collection and management of asynchronous state, and the reference of Observables on the link

  6. Observable lift

export default class Observable<T> {
  // lift, which represents the last New Observable
  protected origin: Observable<any> | null = null;
  // The transfer function
  protected operator: Operator<any, any> | null = null;

  constructor(public producer: Producer<T> = noop) {}

  subscribe(next, error, complete) {
    const subscriber = new Subscriber({ next, error, complete });

    // Lift to pass nextSubscriber
    if (this.operator) {
      this.operator(subscriber, this.origin!) ; }else {
      this.producer(subscriber);
    }
    return subscriber;
  }

  protected lift<R>(operator: Operator<R, T>): Observable<R> {
    const observable = new Observable<R>();
    // This is done
    observable.origin = this;
    observable.operator = operator;
    returnobservable; } pipe(... operators: Operator<any, any>[]): Observable<any> {return operators.reduce(
      // Returns a new Observable that calls SUBSCRIBE and calls the Origin subscribe within it
      (prev: Observable<any>, operator) = > prev.lift(operator),
      this); }}Copy the code

Five, the Operators

  1. Observable packages are so complete, Operators are so much simpler, the old rules from and Map
export const from = <T>(input: T[]) = > {
  // Simple implementation
  if (Array.isArray(input)) {
    return new Observable<T>((subscriber) = > {
      input.some((n, i) = > {
        if (subscriber.isClosed) return true;
        // Use type 1 to send data out
        // Of course, the entire producer function is executed only after the user calls.subscribe (type 0)
        subscriber.next(n, i);
      });

      // type 2subscriber.complete(); }); }};Copy the code
  1. map operator
// Accept callback and this on the user side
// Map returns a function that takes two input arguments, nextSubscriber, which is the subscriber of the last Observable, and Origin, which is the last Observable
// If you don't understand why you need these two things, check out the Observable lift!
export constmap = <V, R>( project: (value: V, index: number) => R, thisArg? : any ): Operator<V, R> => (nextSubscriber, origin) => { let i = 0; Subscriber const _subscription = Origine.subscribe ((value) => {try {// Execute the user logic nextSubscriber.next(project.call(thisArg, value, i), i++); } catch (err) { nextSubscriber.error(err); }}); };Copy the code
  1. Add an interval creation operator to test unsubscribe

Of course, RxJs uses Scheduler to maintain async tasks, which we will simply implement

const timers = {};

const interval = (delay = 1000) = > {
  return new Observable<number>((subscriber) = > {
    let tick = 0;
    timers.interval = setInterval(() = > {
      if (subscriber.isClosed) {
        clearInterval(timers.interval);
        return;
      }
      // Generates one tick per second
      subscriber.next(tick++);
    }, delay);
  });
};

// Of course Subscriber needs to be changed slightly
export default class Subscriber<T> implements Observer<T> {
  public isClosed = false;
  protected destination: Observer<T>;

  next(value: T, index? : number) {}
  error(err: any) {}
  complete() {}

  unsubscribe() {
    this.isClosed = true;
    this.destination = defaultDestination;
    // The relationship between the timer and the current Subscriber should also be determined
    NextSource -> pipe -> nextSource
    Object.values(timers).forEach((timer) = > clearInterval(timer)); }}Copy the code

Six, go aheadminrx

const subscription = interval(1000)
  .pipe(map((n, i) = > n * 2 + "-map-" + i))
  .subscribe((n) = > console.log("get", n));

setTimeout(() = > {
  subscription.unsubscribe();
}, 3100);

// logs
// get 0-map-0
// get 2-map-1
// get 4-map-2
Copy the code

The last

  1. We use two examples to get a practical feel for “observables” and “observers” in responsive programming and the basic concepts of their communication

  2. As you can see, RxJs is also based on the basic concept of Callbag, but it is presented in a more familiar way, which makes the code structure clear and makes it more readable and extensible

  3. However, we are only familiar with the most basic concepts of reactive programming, and there is still a lot to learn about both reactive programming itself and RxJs

  4. Hopefully, in future programming, you can give meaning to processes and soul to operations

  5. Source code: Minrx Demo

  6. Code word is not easy, like small partners, remember to leave your small ❤️ oh ~

The resources

  • A probe into functional programming

  • callbag

  • callbag-basics

  • RxJs