Data manipulation class
1, the map
The Map method of an Observable works the same as the map of an array
var source = interval(1000);
var newest = source.map(x= > x + 2);
newest.subscribe(console.log);
/ / 2
/ / 3
/ / 4
/ / 5..
Copy the code
2, mapTo
MapTo can change the value passed in to a fixed value, as follows
var source = interval(1000);
var newest = source.mapTo(2);
newest.subscribe(console.log);
/ / 2
/ / 2
/ / 2
/ / 2..
Copy the code
3, the filter
A filter is also used in the same way as an array, passing a callback function to each sent element and returning a Boolean that is retained if true and filtered if false, as follows
var source = interval(1000);
var newest = source.filter(x= > x % 2= = =0);
newest.subscribe(console.log);
/ / 0
/ / 2
/ / 4
/ / 6..
Copy the code
4, scan
Data accumulation calculation
var main = from('hello').pipe(
// Print each letter of Hello in turn
zip(interval(500), (x, y) => x)
)
const example = main.pipe(
// scan The second parameter is the initial value
scan(
(origin,next) = > origin + next
)
)
example.subscribe({
next: value= > {
console.log(value);
},
error: err= > {
console.log("Error: " + err);
},
complete: (a)= > {
console.log("complete"); }});// h
// he
// hel
// hell
// hello
// complete
Copy the code
5, repeat
Most of the time, if an Observable does not fail, we want to initiate a subscription repeatedly. In this case, we use repeat, which is basically the same as retry.
var example = from(['a'.'b'.'c']).pipe(
zip(interval(500), (x,y) => x),
repeat()
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
6, groupBy
Group result sets by one or more columns
var people = [
{ name: "Anna".score: 100.subject: "English" },
{ name: "Anna".score: 90.subject: "Math" },
{ name: "Anna".score: 96.subject: "Chinese" },
{ name: "Jerry".score: 100.subject: "Math" },
{ name: "Jerry".score: 80.subject: "English" },
{ name: "Jerry".score: 90.subject: "Chinese"}];var example = from(people).pipe(
groupBy(item= > item.name),
map(group= >
group.pipe(
reduce((acc, cur) = > ({
name: cur.name,
score: acc.score + cur.score
}))
)
),
mergeAll()
);
example.subscribe({
next: value= > {
console.log(value);
},
error: err= > {
console.log("Error: " + err);
},
complete: (a)= > {
console.log("complete"); }});// {name: "Anna", score: 286}
// {name: "Jerry", score: 270}
// complete
Copy the code
The selector class
1, take
Take is a very simple operator that, as its name implies, terminates after taking the first few elements, as follows
var source = interval(1000);
var example = source.take(3);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
// complete
Copy the code
2, the first
First fetches the first element sent by the Observable and then terminates, behaving like take(1).
var source = interval(1000);
var example = source.first();
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
// complete
Copy the code
3, takeLast
In addition to taking the first few, we can also take the last few backwards, as follows:
var source = interval(1000).take(6);
var example = source.takeLast(2);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 4
/ / 5
// complete
Copy the code
4, the last
Just like take(1), we have a simplified version of takeLast(1), which is last() to get the last element.
var source = interval(1000).take(6);
var example = source.last();
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 5
// complete
Copy the code
Control data flow volume class
1, takeUntil
It can make an Observable send a complete message when something happens, as shown in the following example
var source = interval(1000);
var click = fromEvent(document.body, 'click');
var example = source.takeUntil(click);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
/ / 3
// complete (Click on body
Copy the code
We start with interval to build an Observable, which sends a value increasing from 0 every second. Then we pass in another Observable using takeUntil. When the incoming Observable from takeUntil sends a value, the original Observable enters the complete state and sends a complete message. That is, the code above will print a number every second (increasing from 0) until we click on the body and it will send a complete message.
2, concatAll
1. It can be seen that the value sent inside the Source Observable is also an Observable. In this case, we can use concatAll to amortized the Source into example
var click = fromEvent(document.body, 'click');
var source = click.map(e= > of(1.2.3));
var example = source.concatAll();
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
2. ConcatAll processes the First Observable sent by source, and waits until the end of this Observable before processing the next Observable sent by source, as shown in the following example.
var obs1 = interval(1000).take(5);
var obs2 = interval(500).take(2);
var obs3 = interval(2000).take(1);
var source = of(obs1, obs2, obs3);
var example = source.concatAll();
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
/ / 3
/ / 4
/ / 0
/ / 1
/ / 0
// complete
Copy the code
3, skip
Skip is used to skip over the first few outgoing elements
var source = interval(1000);
var example = source.skip(3);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 3
/ / 4
/ / 5...
Copy the code
The one that started at 0 will now start at 3, but remember that the wait time for the original element is still there, meaning that the first element retrieved in this example will have to wait 4 seconds.
4, concat
Concat can combine multiple Observable instances into one. It has the same effect as concatAll. The difference is that concat passes parameters that must be of the Observable type. Here’s an example:
var source = interval(1000).pipe(take(3));
var source2 = of(3)
var source3 = of(4.5.6)
var example = source.pipe(
concat(source2, source3)
);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
/ / 3
/ / 4
/ / 5
/ / 6
// complete
Copy the code
5, startWith
StartWith fills the element to be sent at the beginning of an Observable, sort of like concat, but the parameter is not an Observable but the element to be sent
var source = interval(1000);
var example = source.startWith(0);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 0
/ / 1
/ / 2
/ / 3...
Copy the code
6, the merge
Merge is used in the same way as concat, except that the Observables handled by merge execute asynchronously, running simultaneously in chronological order.
const source1 = interval(1000).pipe(take(3));
const source2 = of(3);
const source3 = of (4.5);
const example = source1.pipe(merge(source2,source3))
example.subscribe({
next: value= > {
console.log(value);
},
error: err= > {
console.log("Error: " + err);
},
complete: (a)= > {
console.log("complete"); }});/ / 3
/ / 4
/ / 5
/ / 0
/ / 1
/ / 2
// complete
Copy the code
7. Delay and delayWhen
Delay delays the first time an Observable issues a subscription as follows:
const example = interval(100).pipe(take(5),delay(1000));
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
/ / 3
/ / 4
Copy the code
DelayWhen, unlike Delay, is defined by the parameter function and takes the value emitted by the main subscription object as an argument:
var example = interval(300).pipe(
take(5),
delayWhen(
x= > empty().pipe(delay(100 * x * x)))
);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
The above example passes the first value emitted from the source as an argument to the delayWhen function. The main subscriptionobject only emits the subscribed value when the Observable in the parameter object emits the subscribed value.
8 debounceTime.
Once an element is received, Debounce caches it and waits for a certain amount of time. If nothing is received during that time, debounce sends it out. If a new element is received during that time, the cache element is released and timed again.
var example = interval(300).pipe(take(5),debounceTime(1000));
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 4
// complete
Copy the code
9, throttle
Unlike Debounce, Throttle will emit elements first, then silence them for a period of time, and then continue to emit elements when the time has passed, preventing an event from being triggered too often and affecting efficiency.
var example = interval(300).pipe(
take(5),
throttleTime(1000)); example.subscribe({next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
Distinct and distinctUntilChanged
Distinct will be compared to the existing data to filter out duplicate elements as follows:
var example = from(['a'.'b'.'c'.'a'.'b']).pipe(
zip(interval(300), (x, y) => x),
distinct()
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});// a
// b
// c
// complete
Copy the code
The distinct first argument is a function that returns the value of a distinct comparison:
var source = from([{ value: 'a' }, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }]).pipe(
zip(interval(300), (x, y) => x)
)
var example = source.pipe(
distinct((x) = > {
return x.value
})
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete
Copy the code
However, the base of DISTINCT is to create a set to assist in de-duplication. If the data is large, the set may be too large. In this case, the second parameter distinct needs to be set to refresh the set. The second parameter is observable, which clears the set when it initiates a subscription
var flushes = interval(1300);
var example = from(['a'.'b'.'c'.'a'.'c']).pipe(
zip(interval(300), (x, y) => x),
distinct(
null,flushes
)
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});// a
// b
// c
// c
// complete
Copy the code
DistinctUntilChanged differs from distinct in that distinctUntilChanged only compares two adjacent entries, as shown in the following example:
var example = from(['a'.'b'.'c'.'c'.'b']).pipe(
.zip(interval(300), (x, y) => x),
distinctUntilChanged()
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});// a
// b
// c
// b
// complete
Copy the code
Coordinate multiple Observables
1, combineLatest
Coordinate with each Observable, and initiate subscriptions when one of the parameters changes (provided that each Observable has a value). When conbineLatest passes no second argument, the subscription value is returned as an array
// timerOne emits the first value at 1 second and then every 4 seconds
const timerOne = timer(1000.4000);
// timerTwo emits the first value at 2 seconds and then every 4 seconds
const timerTwo = timer(2000.4000);
// timerThree emits the first value at 3 seconds and then every 4 seconds
const timerThree = timer(3000.4000);
// When a timer emits values, emit the latest values of each timer as an array
const combined = combineLatest(timerOne, timerTwo, timerThree);
const subscribe = combined.subscribe(latestValues= > {
// Get the latest emitted value from timerValOne, timerValTwo, and timerValThree
const [timerValOne, timerValTwo, timerValThree] = latestValues;
/* Example: timerOne first tick: 'Timer One Latest: 1, Timer Two Latest:0, Timer Three Latest:0 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 0 timerThree first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 1 */
console.log(
`Timer One Latest: ${timerValOne},
Timer Two Latest: ${timerValTwo},
Timer Three Latest: ${timerValThree}`); });Copy the code
2. ConbineLatest can pass in the second parameter and send it to Observabler for data processing.
var source = interval(500).take(3);
var newest = interval(300).take(6);
var example = source.combineLatest(newest, (x, y) => x + y);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
/ / 3
/ / 4
/ / 5
/ / 6
/ / 7
// complete
Copy the code
As for the above example, because two observables are merged here, the later callback function receives x and Y parameters. X will receive the value sent from source, and Y will receive the value sent from newest. Finally, whether a source or a newest sender sends a value, as long as the other sender has sent a value (which has the last value), the callback will be executed and a new value will be sent. So this program runs like this: The newest sends a 0, but at this point the source does not send any value, so the callback is not executed. The source sends a 0, at which point the newest sends a value of 0, and passes the two numbers into the callback to get 0. The newest sends a 1, at which point the source last sends a value of 0, and passes these two numbers into the callback to get 1. The newest sends a 2, at which point the source last sends a value of 0, and passes these two numbers into the callback to get 2. The source sends 1, at which point the newest sends a value of 2, and passes these two numbers into the callback to get 3. The newest sends 3, at which point the source last sends a value of 1, and passes these two numbers into the callback to get 4. The source sends 2, at which point the newest sends a value of 3, and passes these two numbers into the callback to get 5. Source is over, but newest is not, so Example is not. The newest sends 4, at which point the source last sends a value of 2, and passes these two numbers into the callback to get 6. The newest sends 5, at which point the source last sends a value of 2, and passes these two numbers into the callback to get 7. Newest ends, because source also ends, so example ends.
2, zip
The same index elements of each Observable are passed into the callback together
var source = interval(500).take(3);
var newest = interval(300).take(6);
var example = source.zip(newest, (x, y) => x + y);
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 2
/ / 4
// complete
Copy the code
For the example above, zip will wait until both source and newest send the first element, then pass it to callback, and the next time, wait until both source and newest send the second element, then pass it to callback, so the procedure is as follows: The newest sends the first value of 0, but the source does not send the first value at this point, so the callback is not executed. The source sends the first value of 0, and the first value of the newest before it is added is 0. Pass these two numbers into the callback to get 0. The newest sends a second value of 1, but the source does not send the second value at this point, so the callback is not executed. The newest sends a third value of 2, but the source does not send a third value at this point, so the callback is not executed. The source sends a second value of 1, which the newest sends before it is added, and passes these two numbers into the callback to get 2. The newest sends a fourth value of 3, but the source does not send a fourth value at this point, so the callback is not executed. The source sends a third value, 2, which the newest sends before it is added, and passes these two numbers into the callback to get 4. Example ends when source ends, because source and newest do not have corresponding order values.
3, withLatestFrom
WithLatestFrom and combineLatest are used in a similar way. The main feature of withLatestFrom is that the main Observable initiates a subscription only when it initiates a value, but the sub-Observable does not initiate a subscription if it has not sent a value. Example:
var main = from('hello').pipe(
zip(interval(500), (x, y) => x)
)
var some = from([0.1.0.0.0.1]).pipe(
zip(interval(300), (x, y) => x)
)
var example = main.pipe(
withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
})
)
example.subscribe({
next: value= > {
console.log(value);
},
error: err= > {
console.log("Error: " + err);
},
complete: (a)= > {
console.log("complete"); }});// h
// e
// l
// L
// O
// complete
Copy the code
WithLatestFrom will execute the callback when main sends a value, but note that if some hasn’t sent a value before main sends a value, the callback will still not execute! Callback (‘ h ‘, ‘h’); callback (‘ h ‘, ‘h’, ‘h’); Main sends e, and some sends 0 last time. Pass these two arguments to callback to get e. Callback (); callback (); callback (); callback (); Callback (); callback (); callback (); callback (); Main sends o, and some last sent a value of 1. Pass these two arguments to callback to get O.
4, concatMap
ConcatMap is map plus concatAll
var source = fromEvent(document.body, 'click');
var example = source.pipe(
map(e= > interval(1000).pipe(take(3))),
concatAll()
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
Converting to concatMap looks like this:
var source = fromEvent(document.body, 'click');
var example = source.pipe(
concatMap(
e= > interval(100).pipe(take(3))
)
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
5, mergeMap
MergeMap is also mergeAll plus Map
var source = fromEvent(document.body, 'click');
var example = source.pipe(
mergeMap(
e= > interval(100).take(3)
)
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
6, switchMap
Switch in RXJS6 only switchMap Switch has a feature compared with merge and concat that subordinate Observables immediately unbind the main Observables after initiating subscriptions.
var source = fromEvent(document.body, 'click');
var example = source.pipe(
.switchMap(
e= > interval(100).pipe(take(3))
)
)
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
Change the data flow structure
1, concatAll
var obs1 = interval(1000).pipe(take(5));
var obs2 = interval(500).pipe(take(2));
var obs3 = interval(2000).pipe(take(1));
var source = of(obs1, obs2, obs3);
var example = source.pipe(concatAll());
example.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});/ / 0
/ / 1
/ / 2
/ / 3
/ / 4
/ / 0
/ / 1
/ / 0
// complete
Copy the code
Obs1, obs2, obs3 are executed one by one in this example
mergeAll
The usage of mergeAll and concatAll is basically the same. The difference is that mergeAll is a parallel processing Observable.
var click = fromEvent(document.body, 'click');
var source = click.pipe(
map(_= > interval(1000)),
mergeAll()
);
source.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: (a)= > { console.log('complete'); }});Copy the code
The special feature of mergeAll is that mergeAll can pass a parameter that indicates the maximum number of parallel processes. When the number of observables being processed is greater than this number, resources can be allocated only when the number of observables being processed is completed. MergeAll (1) works just like concatAll.