Introduction to the

RxJS is a library for responsive programming using Observables, which makes it easier to write asynchronous or callback-based code and is a JavaScript version of the ReactiveX programming concept. The power of RxJS is its ability to use pure functions to generate values. This means your code is less error-prone.

The installation

The official installation

npm install rxjs
// Import the entire set of core features:
import Rx from 'rxjs/Rx';
Rx.Observable.of(1.2.3)
Copy the code

Recommended installation

According to the official installation, it is found that RXJS cannot be loaded completely, and you need to rely on the rxJs-compat package. The following installation is recommended

npm i -s Rxjs@6 rxjs-compat@6
import * as Rx from 'rxjs/Rx'
Copy the code

Core concepts of RxJS

Introduction of observables

Observable example

  Rx.Observable.of('1'.'2'.'3').map(x= >x*10).filter(x= >x>5).subscribe(x= >console.log(x))
Copy the code
  • The creation process:Rx.Observable.of(‘1’, ‘2’, ‘3’)Create an Observable that sends 1, 2, and 3 in turn
  • Logical process :*.map().filter()* multiply each value by 10, then filter out values greater than 5. If you write the filter operator first and then map, you get no data
  • Subscribe process :*subscribe()* similar to the callback function. This process gets an object subscription.
  • Implementation process:x=>console.log(x)By default, the next callback is executed
  • The cleaning process:The sample is as follows
const subscription = Rx.Observable.of('1'.'2'.'3').map(x= >x*10).filter(x= >x>5).delay(1000).subscribe(x= >console.log(x));
subscription.unsubscribe()
Copy the code

Introduction of the Subject

What is Subject? – RxJS Subject is a special type of Observable that allows values to be multicast to multiple observers, so Subject is multicast, whereas normal Observables are unicast (each Observables subscribed has a separate execution of an Observable). Each Subject is an Observable. – For Subject, you can provide an observer and use the SUBSCRIBE method to start receiving values normally. From the observer’s point of view, it can’t tell whether the Observable execution is coming from a normal Observable or a Subject. Within the Subject, SUBSCRIBE does not call a new execution of the sent value. It simply registers a given observer into a list of observers, similar to how addListener works in other libraries or languages. Each Subject is an observer. – Subject is an object with the following methods: Next (v), error(e), and complete(). To give a Subject a new value, simply call next(theValue), which will multicast theValue to observers registered to listen to the Subject. A Subject is like an Observable, but can be multicast to multiple observers. Subject also acts like EventEmitters, maintaining a registry of multiple listeners. According to the official website, we can probably understand as follows: An Observable is a kind of special Observable, which is similar to one lane, one lane, one lane, the wrong way, or multiple cars driving at the same time. It can multipush values to multiple observers. A common Observable does not have the ability to multipath push (each Observer has its own independent execution environment), while a Subject can share an execution environment

Subject:

const test = Observable.interval(1000).take(3);
const observerA = {
  v => console.log(`a:${v}`)}const observerB = {
  v => console.log(`b:${v}`)}/// Define observable
test .subscribe(observerA)
setTimeout((a)= > {test .subscribe(observerB) }, 2000)    
Observable is unicast, so it outputs a:0, a:1, b:0, a:2, b:1, b:2
const subject = new Subject()
subject.subscribe(observerA)
test.subscribe(subject)
setTimeout((a)= > {subject.subscribe(observerB)}, 2000)
/// Since the Subject is multicast and shares an execution, the output is: a:0, a:1, a:2, b:2
Copy the code

The Subject of polymorphic

Due to the particularity of subject, a variety of variations of subject are derived, which will not be elaborated in detail. Their pairing is shown in the following figure

Rxjs Whether to store data Whether an initial value is required When to release data to subscribers
Subject no no Release timely, as new data becomes available
BehaviorSubject If yes, store the last data or the initial value is Release timely, as new data becomes available
ReplaySubject If yes, store all data no Release timely, as new data becomes available
AsyncSubject Yes, store the last piece of data is Delayed publication, published only when the data source is complete

The Scheduler profile

What is Scheduler? – Scheduler controls when subscription is started and when notifications are sent. It consists of three parts

A scheduler is a data structure. It knows how to store and sort tasks based on priority or other criteria. The scheduler is the execution context. It indicates when and where the task is executed (immediate, for example, or another callback function mechanism (such as setTimeout or process.nexttick), or animation frames). The scheduler has a (virtual) clock. The scheduler function provides the concept of “time” through its getter method now(). Tasks scheduled on a specific scheduler will strictly follow the time represented by that clock. The scheduler lets you specify the execution context in which an Observable sends notifications to its observers.

Operator induction

RxJS provides various apis to create data streams:

Single-value: of, empty, never Multi-value: from Timing: interval, timer Created from the event: fromEvent Created from the Promise: fromPromise User-defined: create

The resulting data stream is an observable sequence that can be subscribed to or used to perform transformation operations such as:

Pluck some values: filter, skip, first, last, take Delay, timeout, throttle, debounce, audit, bufferTime cumulative: reduce, scan Exception handling: throw, catch, finally, retry, conditional execution: TakeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn Conversion: switch

Several data streams can also be combined:

Merge two data streams into merge sequence race. The default value is forkJoin for one of the data streams. The default value is Zip for all data streams. Take the last value of each source data stream and merge it into an array

RxJS difficulties

RxJS is very good at handling asynchronous logic, data flows, and events. Using Rxjs to pre-process data is generally in a ‘God’ perspective to debug data visualization. Rxjs greatly shortens the amount of code while achieving better data processing (purity). It is because of its powerful features, so learning Rxjs has the following difficulties (I think) 1, the degree of abstraction is relatively high, requires developers to have a strong ability to summarize 2, operators and miscellaneous, need to spend a lot of effort to remember and rational use of each operator

test

  • 1. The console output is a multiple of 5 every 2 seconds after the mouse click
  • 2. There are three asynchronous operations A, B, and C, please provide a method to output values at the same time after the three asynchronous operations are completed in parallel
  • 3. “And future big data” === “take the last 4 words (various methods)
  • 4. Simulate a programmer who earns the same amount of money every day without increasing his salary. When he has enough money (100N), he will buy a house and rent the house to others

Refer to the answer

1 / / / / / / subject
const timer = Rx.Observable.interval(2000);
const event = Rx.Observable.fromEvent(document.'click')
event.switchMap((a)= > timer)
 .map(x= > x * 5)
 .subscribe(x= > console.log('Problem 1 :' + x));
Copy the code
2 / / / / / subject
const fa = (cb) = > {
  setTimeout((a)= > cb('a'), 1000);
}
const fb = (cb) = > {
  setTimeout((a)= > cb('b'), 2000);
}
const fc = (cb) = > {
  setTimeout((a)= > cb('c'), 4000);
}
const oa = Rx.Observable.bindCallback(fa);
const ob = Rx.Observable.bindCallback(fb);
const oc = Rx.Observable.bindCallback(fc);

Rx.Observable.combineLatest(oa(),ob(),oc())
  .subscribe(x= > console.log('Problem 2 :' + x));
  ///// can also be used with forkJoin,zip
Copy the code
/ / / / / / title 3
const str = "People and the Future of big Data";
const param = str.split(' ');
Rx.Observable.from(param)
  .takeLast(4)
  .subscribe(x= > console.log('Problem 3 :' + x));
///////////////////////////////////////////////////////
Rx.Observable.from(param).subscribe(new ReplaySubject(3))
Copy the code
/ / / / / / / the topic 4
const house$ = new Subject()  / / / house
const houseCount$ = house$.scan((acc, num) = > acc + num, 0).startWith(0) / / / house number

// Wages have remained flat
const salary$ = Observable.interval(100).mapTo(1) // Programmer salary n
const rent$ = Observable.interval(3000)
  .withLatestFrom(houseCount$)
  .map(arr= > arr[1] * 5)

// As soon as I buy a house, I run out of cash...
const income$ = Observable.merge(salary$, rent$)
const cash$ = income$
  .scan((acc, num) = > {
    const newSum = acc + num
    const newHouse = Math.floor(newSum / 100)
    if (newHouse > 0) {
      house$.next(newHouse)
    }
    return newSum % 100
  }, 0)
houseCount$.subscribe(num= > console.log(`houseCount: ${num}`))
cash$.subscribe(num= > console.log(`cash: ${num}`))
Copy the code

Original link: tech.gtxlab.com/sth-about-r…


About the author: Zhang Shuan, Renhe future big data front-end engineer, focusing on HTML/CSS/JS learning and development.