Author: Energetically intelligent technology team – front-end bean keeper

Write at the front:

(1) I had one and a half years of Angular development experience when I first started working as a data kanban platform. At that time, I needed to deal with the complex state management between kanban and charts. Angular’s built-in RxJS data flow management combined with the powerful feature of dependency injection perfectly covered the complex data flow.

(2) RxJS is also used in the development process of React technology stack. It is mainly used for activity flow design, data flow management of basic components, cross-level component communication, etc., using data-driven decoupling logic.

One, the introduction

1.1 background

  • Programming paradigm

Imperative programming: Modifying program state by statement or command. (eg: JS, Java, Python)

Functional programming: Functions are first class citizens, performing complex operations through function calls, compositions, and so on. (eg: Corrification, function combination)

Declarative programming: Describes program logic rather than control flow. (eg: SQL, HTML, CSS)

Responsive programming: Declarative programming paradigm based on data flow and the propagation of its changes. (eg: ReactUI = f(state))

Reactive Extensions (Rx) : Reactive Extensions for JavaScript (RxJS) : Reactive Extensions of JS

  • What can RxJS do?

Think of RxJS as Lodash for events.

Abstract synchronous/asynchronous operations to streams, pay attention to changes to data flows, and provide a series of operations and processing methods for data flows. Effectively reduce code logic coupling through data flow driven.

1.2 Concepts

1.2.1 flow

In computer science, a stream is a sequence of data elements made available over time.

A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.

A stream is a series of data arriving over time. For example: event stream, live data stream, text editing stream, WebSocket.

Streams can smooth out the differences between synchronous and asynchronous streams, and both asynchronous and synchronous data can be put into streams.

1.2.2 Observer mode

Defines a one-to-many dependency between objects so that whenever an object’s state changes, its dependent objects are notified and automatically updated.

✅ advantages

  • Abstract coupling, can realize UI layer and data layer separation;
  • Abstract data interface, define stable message propagation mechanism;
  • Supports 1 – pair multi – broadcast communication

❌ shortcomings

  • When the observation level is deep, the cost of broadcast communication is high.
  • An infinite loop crash occurs when there is a cyclic dependency between observer and target.
  • The observer perceives only the change of the target, not the process of the change

Ii. Core Concepts

2.1 observables & Observer

  1. Observable

Observables are lazy Push collections of multiple values.

Observables is a lazy collection of multiple values.

  • Multiple values: Multiple values can be pushed
  • Lazy push: Push values only when they are first subscribed

  1. Pull vs Push

Pull: Data consumers take the initiative to obtain data, but data producers have no perception.

Push: When the data producer decides to send data to the consumer, the data consumer has no perception.

Single value More value
Pull Function Iterator
Push Promise Observable
  • FunctionCall synchronously returns a single value;
  • IteratorIteratively called, returns multiple values when called;
  • PromiseAsynchronously returns a single value;
  • ObservableReturn multiple values synchronously/asynchronously.
  1. Observer

An Observer is a consumer of values delivered by an Observable.

The Observer is the data consumer of an Observable.

An Observer is a common object defined as follows:

interface Observer<T> {
    /** Is closed */closed? :boolean;
    */ is executed when the stream emits a value
    next: (value: T) = > void;
    /** Execute */ if there is an error
    error: (err: any) = > void;
    */ is executed when the stream is complete
    complete: () = > void;
}

/ / subscribe flow
// observable.subscribe(observer);
Copy the code
  1. Subscription

A Subscription is an object that represents a disposable resource, usually the execution of an Observable.

A subscription represents a resource that can be closed (typically an Observable execution).

/ / subscribe flow
const subscription = stream$.subscribe({
    next: (val) = > console.info,
    error: console.error,
    complete: () = > console.info('completed')});// Clear the subscription
subscription.unsubscribe();
Copy the code

❗️❗️ port unsubscribe() must be called after the subscription is complete to release resources and prevent memory leakage.

2.2 the Subject

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

Subject is a multicast enabled Observable.

  • Observable: directly subscribes to get data
  • Also Observer: can be callednext(v),error(e),complete()Method is broadcast to all registered observers.
The Subject type The characteristics of Schematic diagram
Subject * general Subject

* Support multicast
BehaviorSubject * Data flow over time

* Caches the current value

* When subscribed, the latest value will be pushed immediately
ReplaySubject * Caches the most recent values

* The cache window can be set
AsyncSubject * Emits values only when done

2.3 the Operator

Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner.

Operators are important building blocks for declaratively handling complex asynchrony.

📌 RxJS ships with operators. —— Ben Lesh

Operators in RxJS are pure functions. The operators provided by the authorities are basically enough for daily development, and you can customize the operators if you have special requirements.

  1. Pipe operator

Receives the source Observable as an argument and returns the new Observable. Pure operations, that is, do not change the source Observable.

See 👉 for the category of pipe operators.

category Explain 📖 For example, 🌰
combination Join information from multiple streams. Select operators based on the order, time, and structure of the values. mergeCombined flowconcatConnection flow
conversion The values based on the source Observable are converted to the new Observable. mapConvert the value of the streamswitchMapFlow switch
filter Filters the value of the source Observable. filterFilter the value of the streamdebounceImage stabilization
multicast A single-valued stream is converted to a multicast stream. multicastUse Subject multicast streams
Error handling Handle errors in the flow. retryError retrycatchErrorCapture the error
tool Tool function operations. tapSide effects (for debugging)timestampPrints the value and timestamp of the stream
conditions Conditional processing of the source Observable. everyReturns true for each value of the streamisEmptyWhether the stream is empty
gather The converging stream is worth going to the new stream. countcountreduceSimilar to array reduce
  1. Create operator

Functions used to create observables, such as

  • of: Emits an arbitrary number of values in order

  • interval: Emits a sequence of numbers based on a given time interval

  • fromEventConverts DOM events into Observables
  • fromPromiseConverts promises to Observables
  • .

2.4 the Scheduler

Scheduler controls when a subscription starts and when notifications are delivered.

The scheduler controls when a subscription starts and when a stream propagates data.

  1. The scheduler provides:
  • Data storage and message queue scheduling
  • Subscription execution context
  • Virtual clock
  1. Scheduler category

💡 In most scenarios, use the default scheduler.

category Explain 📖 For example, 🌰
null Default scheduler Synchronization, recursion (common)
queueScheduler Queue scheduler Synchronization, queuing
asapScheduler Microtask scheduler Asynchronous, microtask
asyncScheduler Asynchronous scheduler Asynchronous, macro tasks
animationFrameScheduler Animation scheduler window.requestAnimationFrame

Three, simple practice

3.1 Drag the ball

StackBlitz project RXJS – drag

  1. Drag and drop process

The dragging process of the ball can be described as follows:

  • Mousedown: Start dragging and recording the mousedown position
  • Mouse movement (Mousemove) : Record the initial position of the ball, and update the position of the ball when the mouse moves
  • Mouseup: The drag ends
  1. The data flow

Mouse events triggered during dragging correspond to three streams: mousedown$, mousemove$, and mouseup$.

Drag flow drag$can be described as follows:

  • mousedown$Trigger drag, record the mouse down position
  • Switch (switchMap) tomousemove$, record the initial position of the ball, and calculate the moving position of the ball during the moving process
  • whenmouseup$When triggered, takeUntilmousemove$
const mousedown$ = fromEvent<MouseEvent>(ball, 'mousedown').pipe(
  map(getMouseEventPos)
);

const mousemove$ = fromEvent<MouseEvent>(document.'mousemove').pipe(
  map(getMouseEventPos)
);

const mouseup$ = fromEvent<MouseEvent>(document.'mouseup');

const drag$ = mousedown$.pipe(
  switchMap(initialPos= > {
    const { top, left } = ball.getBoundingClientRect();

    return mousemove$.pipe(
      map(({ x, y }) = > ({
        top: y - initialPos.y + top,
        left: x - initialPos.x + left })), takeUntil(mouseup$) ); }));Copy the code

The ball subscribes to Drag $to update its position.

drag$.subscribe(({ top, left }) = > {
  ball.style.top = `${top}px`;
  ball.style.left = `${left}px`;
  ball.style.bottom = ' ';
  ball.style.right = ' ';
});
Copy the code

3.2 management of toast

StackBlitz project RXJS – toast

  1. Toast demand

A toast can be described as follows :(it is agreed that the stay time of toast is 3s, and manual hiding is not supported)

  • Show a piece of text
  • If no new toast appears within 3s, it will be automatically hidden
  • New toast appears in 3s, re-time 3s, and cover old toast

  1. The data flow

Split toast management into:

  • showshow$: Trigger display
  • hiddenhide$: Arrival duration, or new toast covers current

const click$ = fromEvent(document.getElementById('btn'), 'click');
const toast$ = click$.pipe(
  switchMap(() = > {
    let hideByDuration = false;

    const duration$ = timer(2000).pipe(
      mapTo('hide by duration'),
      tap(() = > (hideByDuration = true)));return concat(of('show'), duration$).pipe(
      finalize(() = > {
        if(! hideByDuration) {console.log('hide by next'); }})); })); toast$.subscribe(console.info);
Copy the code

Four, conclusion

RxJS performs well in scenarios such as flow control and multiple asynchronous collaborations, and the code written using RxJS is concise and efficient. We hope you can take RxJS into consideration when selecting the technology.

Reference documentation

  • ReactiveX
  • RxJS website
  • Learn the RxJS operator
  • Graphic design pattern