Rxjs operator field guide
1. Tool method type
count
- The total number of statistics
import { range } from 'rxjs';
import { count } from 'rxjs/operators';
const numbers = range(1, 7);
const result = numbers.pipe(count(i => i % 2 === 1));
result.subscribe(x => console.log(x));
// Results in: / / 4Copy the code
reduce
- The cumulative
import { fromEvent, interval } from 'rxjs';
import { reduce, takeUntil, mapTo } from 'rxjs/operators';
const clicksInFiveSeconds = fromEvent(document, 'click').pipe(
takeUntil(interval(5000)),
);
const ones = clicksInFiveSeconds.pipe(mapTo(1));
const seed = 0;
const count = ones.pipe(reduce((acc, one) => acc + one, seed));
count.subscribe(x => console.log(x));
Copy the code
max\min
import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';
const obs$ = of(5, 4, 7, 2, 8);
merge(
obs$.pipe(max()),
obs$.pipe(min()),
).pipe(tap((val) => {
console.log("result....",val);
})).subscribe(console.log);
//output
result.... 8
8
result.... 2
2
Copy the code
tap
The log output of is printed one by one
import { of,merge } from 'rxjs';
import { max,min,tap } from 'rxjs/operators';
const obs$ = of(5, 4, 7, 2, 8);
obs$.pipe(tap({
next:(val) => {
console.log("val",val);
},
error:() => {
},
complete:() => {
}
})).subscribe(console.log)
//output
val 5
5
val 4
4
val 7
7
val 2
2
Copy the code
delay
- Delay execution, but ignore error?
import { of } from 'rxjs';
import { tap, delay } from 'rxjs/operators'; Const $= of obs ([1, 2]); obs$.pipe(tap(res => { console.log("get value from of....",new Date().toLocaleTimeString());
}),delay(2000),tap(() => { # delay execution, which can also become random delay. delayWhen(event => interval(Math.random() * 5000))
console.log("get value from of....",new Date().toLocaleTimeString());
})).subscribe(res => {
console.log("of res... ; .",res);
});
//output
get value from of.... 7:52:27 AM
get value from of.... 7:52:29 AM
of res...;. [ 1, 2 ]
Copy the code
delayWhen
import { interval, timer } from 'rxjs';
import { delayWhen } from 'rxjs/operators'; const message = interval(1000); const delayForFiveSeconds = () => timer(5000); const delayWhenExample = message.pipe(delayWhen(delayForFiveSeconds)); const subscribe = delayWhenExample.subscribe(val => console.log(val)); //output 5s delay.... 0 1 2Copy the code
repeat
Repeat === multiple subscriptions
import { tap } from 'rxjs/operators';
// RxJS v6+
import { repeat, delay } from 'rxjs/operators';
import { of } from 'rxjs';
const delayedThing = of('delayed value').pipe(
tap(() => {
console.log("time.. 1.",new Date().toLocaleTimeString());
}),
delay(2000)
);
delayedThing
.pipe(
tap(() => {
console.log("time... 2. "",new Date().toLocaleTimeString()); }), repeat(3) ) .subscribe(console.log); //output time.. 1. 4:42:45 PM time... 2 4:42:47 PM delayed value time.. 1. 4:42:47 PM time... 2 4:42:49 PM delayed value time.. 1. 4:42:49 PM time... 2 4:42:51 PM delayed valueCopy the code
SubscribeOn, observeOn
- Adjust the timing of execution,
import { of, merge } from 'rxjs';
const a = of(1, 2, 3, 4);
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
// 1 2 3 4 5 6 7 8 9
import { of, merge, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';
const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
//5 6 7 8 9 1 2 3 4
import { interval } from 'rxjs';
import { observeOn } from 'rxjs/operators'; const intervals = interval(10); // Intervals are scheduled // with async scheduler by default... intervals.pipe( observeOn(animationFrameScheduler), // ... but we will observe on animationFrame ) // scheduler to ensure smooth animation. .subscribe(val => { someDiv.style.height = val +'px';
});
Copy the code
materialize
- With the default object package box, deMaterialize unboxes
import { of } from 'rxjs';
import { materialize, map } from 'rxjs/operators';
const letters = of('a'.'b'.'13'.'d');
const upperCase = letters.pipe(map(x => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
materialized.subscribe(x => console.log(x));
Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }
Notification { kind: 'N', value: '13', error: undefined, hasValue: true }
Notification { kind: 'N', value: 'D', error: undefined, hasValue: true }
Notification { kind: 'C', value: undefined, error: undefined, hasValue: false }
Copy the code
timestamp
- Add a timestamp
import { of } from 'rxjs';
import { materialize, map, timestamp, tap } from 'rxjs/operators';
const letters = of('a'.'b'.'13'.'d');
const times = letters.pipe(timestamp());
times.subscribe(res => {
console.log("res...",res)
});
//output
res... Timestamp { value: 'a', timestamp: 1594074567694 }
res... Timestamp { value: 'b', timestamp: 1594074567700 }
res... Timestamp { value: '13', timestamp: 1594074567700 }
res... Timestamp { value: 'd', timestamp: 1594074567700 }
Copy the code
The timeout, timeInterval
- TimeInterval Prints the object, and timeout prints the value
import { timeInterval, timeout } from "rxjs/operators";
import { interval } from "rxjs";
const seconds = interval(1000);
seconds.pipe(timeInterval())
.subscribe(
value => {
console.log("time.....");
console.log(value)
},
err => console.log(err),
);
seconds.pipe(timeout(1100)) # 900 is ignored if the interval is less than 1000
.subscribe(
value => {
console.log("out.....");
console.log(value)
},
err => console.log(err),
);
//output
time.....
TimeInterval { value: 0, interval: 1007 }
out.....
0
time.....
TimeInterval { value: 1, interval: 1005 }
out.....
1
Copy the code
timeoutWith
import { interval } from 'rxjs';
import { timeoutWith } from 'rxjs/operators'
import { of } from 'rxjs';
const first$ = interval(3000);
const second$ = of('go to the default');
first$.pipe(timeoutWith(2000,second$)).subscribe(console.log) Data must be retrieved within 2s, otherwise use the default value
//output
go to the default
Copy the code
toArray
import { interval } from 'rxjs';
import { toArray, take } from 'rxjs/operators';
const source = interval(1000);
const example = source.pipe(
take(10),
toArray()
);
const subscribe = example.subscribe(val => console.log(val));
// output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Copy the code
2. The creative
Of – Single output
Synchronizing data generation
import { of } from 'rxjs'; Of ([1,2,3]).subscribe(next => console.log('next:', next),
err => console.log('error:', err),
() => console.log('the end')); // result: //'next: [1, 2, 3]'
Copy the code
From – Split the output
Synchronizing data generation
import { from, asyncScheduler } from 'rxjs';
console.log('start');
const array = [10, 20, 30];
const result = from(array, asyncScheduler);
result.subscribe(x => console.log(x));
console.log('end');
// Logs:
// start
// end
// 10
// 20
// 30
Copy the code
ajax
Asynchronously generating data
- ajax
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const obs$ = ajax(`https://api.github.com/users?per_page=5`).pipe(
map(userResponse => console.log('users: ', userResponse)),
catchError(error => {
console.log('error: ', error);
returnof(error); }));Copy the code
- getJson
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
import { of } from 'rxjs';
const obs$ = ajax.getJSON(`https://api.github.com/users?per_page=5`).pipe(
map(userResponse => console.log('users: ', userResponse)),
catchError(error => {
console.log('error: ', error);
returnof(error); }));Copy the code
- Class jquery writing
import { ajax } from 'rxjs/ajax';
import { of } from 'rxjs';
const users = ajax({
url: 'https://httpbin.org/delay/2',
method: 'POST',
headers: {
'Content-Type': 'application/json'.'rxjs-custom-header': 'Rxjs'
},
body: {
rxjs: 'Hello World! '
}
}).pipe(
map(response => console.log('response: ', response)),
catchError(error => {
console.log('error: ', error);
returnof(error); }));Copy the code
interval
Asynchronously generating data
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
const numbers = interval(1000);
const takeFourNumbers = numbers.pipe(take(4));
takeFourNumbers.subscribe(x => console.log('Next: ', x));
// Logs:
// Next: 0
// Next: 1
// Next: 2
// Next: 3
Copy the code
3. The type conversion
mergeMap
import { of, interval } from 'rxjs';
import { mergeMap, map } from 'rxjs/operators';
const letters = of('a'.'b'.'c');
const result = letters.pipe(
mergeMap(x => interval(1000).pipe(map(i => x+i))),
);
result.subscribe(x => console.log(x));
//output
a0
b0
c0
a1
b1
c1
Copy the code
concatMap
import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';
const source = of(2000, 1000);
// map value from source into inner observable, when complete emit result and move to next
const example = source.pipe(
concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
const subscribe = example.subscribe(val =>
console.log(`With concatMap: ${val}`)); const mergeMapExample =source
.pipe(
// just so we can log this after the first example has run
delay(5000),
mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
)
.subscribe(val => console.log(`With mergeMap: ${val}`)); //output // With concatMap: Delayed by: 2000ms // With concatMap: Delayed by: 1000ms // With mergeMap: Delayed by: 1000ms 1s short delay, early output // With mergeMap: Delayed by: 2000msCopy the code
switchMap
// RxJS v6+
import { timer, interval, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
console.log("time 0....",new Date().toLocaleTimeString()); Timer (1000,4000).pipe(// delay a time to generate an incremented integer tap(() => {console.log()"time 1....",new Date().toLocaleTimeString());
}),
switchMap(
_ => interval(1000).pipe(tap((rs)=> {console.log('inner value.... ',rs)})), // specific interval, Create an increasing integer (outerValue, innerValue, outerIndex, innerIndex) => ({outerValue, innerValue, outerIndex, innerIndex }) ) ).subscribe((res) => { console.log("final res....",res);
});
//output
time 0.... 5:21:59 PM
time 1.... 5:22:00 PM
inner value.... 0
final res.... { outerValue: 0, innerValue: 0, outerIndex: 0, innerIndex: 0 }
inner value.... 1
final res.... { outerValue: 0, innerValue: 1, outerIndex: 0, innerIndex: 1 }
inner value.... 2
final res.... { outerValue: 0, innerValue: 2, outerIndex: 0, innerIndex: 2 }
time 1.... 5:22:04 PM
inner value.... 0
final res.... { outerValue: 1, innerValue: 0, outerIndex: 1, innerIndex: 0 }
Copy the code
exhaustMap
// RxJS v6+
import { interval } from 'rxjs';
import { exhaustMap, tap, take } from 'rxjs/operators';
const firstInterval = interval(1000).pipe(take(10));
const secondInterval = interval(1000).pipe(take(2));
const exhaustSub = firstInterval
.pipe(
exhaustMap(f => {
console.log(`Emission Corrected of first interval: ${f}`);
returnsecondInterval; Subscribe (val => console.log(val)); //output Emission Corrected of first interval: 0 0 1 Emission Corrected of first interval: 2 0 1 Emission Corrected of first interval: 4 0 1 Emission Corrected of first interval: 6 0 1 Emission Corrected of first interval: 8 0 1Copy the code
mapTo
- Modify the value
import { fromEvent } from 'rxjs';
import { mapTo } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const greetings = clicks.pipe(mapTo('Hi'));
greetings.subscribe(x => console.log(x));
Copy the code
map
import { combineLatest, of } from 'rxjs';
import { map, tap } from 'rxjs/operators';
const weight = of(70, 72, 76, 79, 75);
const height = of(1.76, 1.77, 1.78,1.8);
const bmi = combineLatest(weight, height).pipe(
tap(([w,h]) => {
console.log(`w:${w},h:${h}`);
}),
map(([w, h]) => w / (h * h)),
);
bmi.subscribe(x => console.log('BMI is '+ x)); // With output to console: W :75,h:1.76 BMI is 24.212293388429753 W :75,h:1.77 BMI is 23.93948099205209 W :75,h:1.78 BMI is 23.671253629592222 W: 75, h: 1.8 BMI is 23.148148148148145Copy the code
4. Joint
combineLatest
Application scenario: Generates new conclusions based on multiple inputs
Data consolidation is handed over downstream
import { timer, combineLatest } from 'rxjs';
// timerOne emits first value at 1s, then once every 4s
const timerOne$ = timer(1000, 4000);
// timerTwo emits first value at 2s, then once every 4s
const timerTwo$ = timer(2000, 4000);
// timerThree emits first value at 3s, then once every 4s
const timerThree$ = timer(3000, 4000);
// when one timer emits, emit the latest values from each timer as an array
combineLatest(timerOne$, timerTwo$, timerThree$).subscribe(
([timerValOne, timerValTwo, timerValThree]) => {
/*
Example:
timerThree first tick: 'Timer One Latest: 0, Timer Two Latest: 0, Timer Three Latest: 0 timerOne second tick: 'Timer One Latest: 1, Timer Two Latest: 0, Timer Three Latest: 0
timerTwo second tick: 'Timer One Latest: 1, Timer Two Latest: 1, Timer Three Latest: 0 */ console.log( `Timer One Latest: ${timerValOne}, Timer Two Latest: ${timerValTwo}, Timer Three Latest: ${timerValThree}` ); });Copy the code
Consolidate the data and hand it downstream
// RxJS v6+
import { timer, combineLatest } from 'rxjs';
const timerOne$ = timer(1000, 4000);
const timerTwo$ = timer(2000, 4000);
const timerThree$ = timer(3000, 4000);
combineLatest(
timerOne$,
timerTwo$,
timerThree$,
// combineLatest also takes an optional projection function
(one, two, three) => {
return `Timer One (Proj) Latest: ${one},
Timer Two (Proj) Latest: ${two},
Timer Three (Proj) Latest: ${three}`;
}
).subscribe(console.log);
Copy the code
withLatestFrom
Applies to: multiple input sources, but only one dominant one
import { timeInterval, timeout, withLatestFrom } from "rxjs/operators";
import { interval, of } from "rxjs";
const seconds = interval(1000);
const first = interval(500);
const obs$ = first.pipe(withLatestFrom(seconds));
obs$.subscribe(res => {
console.log("res...",res); }); //output res... [ 1, 0 ] res... [ 2, 0 ] res... [ 3, 1 ] res... [ 4, 1 ] res... [ 5, 2 ] res... [ 6, 2 ] res... [ 7, 3 ] res... [ 8, 3 ] res... [9, 4)Copy the code
concat
Suitable for: first come, first served, queue data processing
The first one doesn’t end, the second one never has a chance
// RxJS v6+
import { interval, of, concat } from 'rxjs';
// when source never completes, any subsequent observables never run
concat(interval(1000), of('This'.'Never'.'Runs')) / /log: 1, 2, 3, 4... .subscribe(console.log);Copy the code
The countdown
// RxJS v6+
import { concat, empty } from 'rxjs';
import { delay, startWith } from 'rxjs/operators';
// elems
const userMessage = document.getElementById('message');
// helper
const delayedMessage = (message, delayedTime = 1000) => {
return empty().pipe(startWith(message), delay(delayedTime));
};
concat(
delayedMessage('Get Ready! '),
delayedMessage(3),
delayedMessage(2),
delayedMessage(1),
delayedMessage('Go! '),
delayedMessage(' ', 2000)
).subscribe((message: any) => (userMessage.innerHTML = message));
Copy the code
merge
First come, first output, no matter where it is written, one by one, not an array
import { of, merge, concat } from 'rxjs';
import { mapTo, delay, concatAll, mergeAll } from 'rxjs/operators';
//emit one item
const example = of(null);
merge(
example.pipe(mapTo('Hello --- 1')),
example.pipe(mapTo('World1!--- 1'),delay(1300)),
example.pipe(mapTo('Goodbye --- 1'),delay(500)),
example.pipe(mapTo('World! 2-1 '),delay(300))
).subscribe(val => console.log(val));;
//output
Hello --- 1
World!2 -- 1
Goodbye --- 1
World1!--- 1
Copy the code
startWith
Applicable to: Adding specific data
Add a single front data – Hello world
// RxJS v6+
import { startWith, scan } from 'rxjs/operators';
import { of } from 'rxjs';
//emit ('World! '.'Goodbye'.'World! ')
const source = of('World! '.'Goodbye'.'World! ');
//start with 'Hello', concat current string to previous
const example = source.pipe(
startWith('Hello'),
scan((acc, curr) => `${acc} ${curr}`)); /* output:"Hello"
"Hello World!"
"Hello World! Goodbye"
"Hello World! Goodbye World!"
*/
const subscribe = example.subscribe(val => console.log(val));
Copy the code
Add multiple front data
// RxJS v6+
import { startWith } from 'rxjs/operators';
import { interval } from 'rxjs';
//emit values in sequence every 1s
const source = interval(1000);
//start with -3, -2, -1
const example = source.pipe(startWith(-3, -2, -1));
//output: -3, -2, -1, 0, 1, 2....
const subscribe = example.subscribe(val => console.log(val));
Copy the code
5. Type filter
filter
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
const source = from([
{ name: 'Joe', age: 31 },
{ name: 'Bob', age: 25 } ]); const example = source.pipe(filter((person,index) => { const res = person.age >= 30; console.log(`person info..... `,person,index);returnres; })); const subscribe = example.subscribe(val => console.log(`final result Over 30:${val.name}`));
//output
person info..... { name: 'Joe', age: 31 } 0
final result Over 30: Joe
person info..... { name: 'Bob', age: 25 } 1
Copy the code
6. Condition determination
every
- Each one needs to satisfy the condition to be true
import { of } from 'rxjs';
import { every } from 'rxjs/operators';
of(1, 2, 3, 4, 5, 6).pipe(
every(x => x < 5),
)
.subscribe(x => console.log(x)); // -> false
Copy the code
The find, findIndex
- Just find the first one that meets the criteria
import { fromEvent } from 'rxjs';
import { find } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(find(ev => ev.target.tagName === 'DIV'));
result.subscribe(x => console.log(x));
Copy the code
isEmpty
- Checks whether Observable is empty
import { Subject } from 'rxjs';
import { isEmpty } from 'rxjs/operators';
const source = new Subject<string>();
const result = source.pipe(isEmpty());
source.subscribe(x => console.log(x));
result.subscribe(x => console.log(x));
source.next('a');
source.next('b');
source.next('c');
source.complete();
// Results in:
// a
// false
// b
// c
Copy the code
iif
import { iif, of, interval } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
const r$ = of('R');
const x$ = of('X');
interval(1000)
.pipe(mergeMap(v => iif(() => v % 4 === 0, r$, x$)))
.subscribe(console.log);
//output
R
X
R
X
Copy the code
defaultIfEmpty
import { defaultIfEmpty } from 'rxjs/operators';
import { empty } from 'rxjs';
import { of, merge } from 'rxjs';
const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty! '));
const example = empty().pipe(defaultIfEmpty('Observable.empty()! '));
merge(
example,
exampleOne
).subscribe(console.log);
//output
Observable.empty()!
Observable.of() Empty!
Copy the code
7. Exception handling
catchError
A single treatment
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
//emit error
const source = throwError('This is an error! ');
//gracefully handle error, returning observable with error message
const example = source.pipe(catchError(val => of(`I caught: ${val}`)));
//output: 'I caught: This is an error'
const subscribe = example.subscribe(val => console.log(val));
Copy the code
Integrate other Operators
import { throwError, fromEvent, of } from 'rxjs';
import {
catchError,
tap,
switchMap,
mergeMap,
concatMap,
exhaustMap
} from 'rxjs/operators';
const fakeRequest$ = of().pipe(
tap(_ => console.log('fakeRequest')),
throwError
);
const iWillContinueListening$ = fromEvent(
document.getElementById('continued'),
'click'
).pipe(
switchMap(_ => fakeRequest$.pipe(catchError(_ => of('keep on clicking!!! '))))); const iWillStopListening$ = fromEvent( document.getElementById('stopped'),
'click'
).pipe(
switchMap(_ => fakeRequest$),
catchError(_ => of('no more requests!!! '))); iWillContinueListening$.subscribe(console.log); iWillStopListening$.subscribe(console.log);Copy the code
retry
Setting retry Times
// RxJS v6+
import { interval, of, throwError } from 'rxjs';
import { mergeMap, retry } from 'rxjs/operators';
//emit value every 1s
const source = interval(1000);
const example = source.pipe(
mergeMap(val => {
//throw error for demonstration
if (val > 5) {
return throwError('Error! ');
}
return of(val);
}),
//retry 2 timeson error retry(2) ); /* output: 0.. 1.. 2.. 3.. 4.. 5.. 0.. 1.. 2.. 3.. 4.. 5.. 0.. 1.. 2.. 3.. 4.. 5.."Error! : Retried 2 times then quit!"
*/
const subscribe = example.subscribe({
next: val => console.log(val),
error: val => console.log(`${val}: Retried 2 times then quit!`)
});
Copy the code
retryWhen
delayWhen
// RxJS v6+
import { timer, interval } from 'rxjs';
import { map, tap, retryWhen, delayWhen } from 'rxjs/operators';
//emit value every 1s
const source = interval(1000);
const example = source.pipe(
map(val => {
if (val > 5) {
//error will be picked up by retryWhen
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
//log error message
tap(val => console.log(`Value ${val}was too high! `)), //restartin 6 seconds
delayWhen(val => timer(val * 1000))
)
)
);
/*
output:
0
1
2
3
4
5
"Value 6 was too high!"
--Wait 6 seconds then repeat
*/
const subscribe = example.subscribe(val => console.log(val));
Copy the code