preface
Good programmers know how to get away from repetitive work:
- Jquery was found while manipulating DOM. - Lodash was found while working with JS. - Rx was found while manipulating the event.Copy the code
The concept of Rxjs itself is not complicated, but simply encapsulates the Observer pattern, which is very popular in the front end, whether you use frameworks or native JS, you must have experienced it.
In my opinion, the power and difficulty of Rxjs lies in the flexible use of its nearly 120 operators.
Unfortunately, the description of these operators in the official website is very difficult to understand, which leads to many people who clearly understand the concepts of Rxjs, but are struggling to use the operators.
The purpose of this article is to explain the functions of common Rxjs operators in the most concise and easy to understand way. Learning at the same time, can also be used as a quick reference to the index list.
Read the reminder
- The concept of flow
- Subscribe: call
subscribe
- 3. To exhale.
next
- Done: invoke
complete
Watch out for the completion and subscription times of the stream; some operators must wait for the stream to complete before firing.
If an operator needs all the data it needs to make a judgment, it must wait until the stream is complete.
Create the stream operator
Creating a stream operator is the starting point of a stream. There is no complexity and there is only a simple classification here. For details, please refer to the official website.
Synchronized flow
- Create: new observables
- of
- range
- repeat
- empty
- never
- throw
- generate
Asynchronous flow
- interval/timer
- Form: string/number/ array/class array/Promise /generator
- formPromise
- formEvent
- formEventPattern
- ajax
- repeatWhen
- Defer: Create when you subscribe
Merge class operators
Subscribe to multiple streams and spit the received data down.
concat
The front
-
Subscribe sequentially: the previous stream completes and then subscribes to subsequent streams.
-
The concat stream ends when the stream is complete.
concat(source1$, source2$)
Copy the code
merge
First come first served basis
-
After subscribing to all streams and any stream spews data, the Merge stream spews data.
-
It makes sense for asynchronous data.
-
The merge flow ends when the flow is complete.
merge(source1$, source2$)
Copy the code
zip
One to one merge (like a zipper)
-
Subscribe to all streams, wait for all streams to fire I times, and pass the i-th data into an array.
-
After one stream completes, wait for the same amount of data to arrive from the other stream to complete the ZIP stream.
-
When we use ZIP, we expect the first data to be the first data sent by all streams, and the second data to be the second data sent by all streams.
zip(source1$, source2$)
Copy the code
combineLatest
Merges the last data of all streams
-
Subscribe to all streams, and when any stream is triggered, the final value of all other streams is merged.
-
Because you want to get the last values of the other streams, you initially have to wait for all the streams to spit out values before you can start passing data down.
-
The combineLatest stream is not complete until all streams are complete.
combineLatest(source1$, source2$)
Copy the code
withLatestFrom
Merges the last data of all streams, functions like combineLatest, except that:
-
CombineLatest: When all streams are ready, any stream triggering data will result in data being emitted downward.
-
WithLatestFrom: When all streams are ready (they all have a final value), only the stream calling withLatestFrom spits data down, other streams only record the last value when triggered.
source1$.pipe(withLatesFrom(source2$, source3$))
Copy the code
race
The winner winner-take-all
- Subscribe to all streams and unsubscribe the other streams when the first stream is triggered.
race(source1$, source2$)
Copy the code
startWith
Populate the data in front of the stream
source1$.pipe(startWith(1))
Copy the code
forkJoin
Merges the last data of all streams
- Subscribe to all streams, wait for all streams to complete, fetch the last value of all streams and send down.
forkJoin(source1$, source2$)
Copy the code
Auxiliary class operators
count
After the current stream completes, count how many data was sent by the stream.
source$.pipe(count())
Copy the code
mix/max
After the current flow is complete, calculate the minimum/maximum value.
source$.pipe(max())
Copy the code
reduce
Like an array, after the current stream completes, all data received is passed into the calculation in turn.
source$.pipe(reduce(() = > {}, 0))
Copy the code
Boolean class operator
every
As with arrays, note that if both conditions are true, the result will not be spit out until the stream completes.
The reason is simple: if the stream is not complete, how can the data condition be true?
source$.pipe(every(() = > true/false))
Copy the code
The find, findIndex
Same array, notice the same thing as every
source$.pipe(find(() = > true/false))
Copy the code
isEmpty
Determine if the stream completed without spitting out any data.
source$.pipe(isEmpty())
Copy the code
defaultIfEmpty
If the stream satisfies isEmpty, spit out the default value.
source$.pipe(defaultIfEmpty(1))
Copy the code
Filter class operators
filter
With an array of
source$.pipe(filter(() = > true/false))
Copy the code
first
Take the first data that satisfies the condition, or if no condition is passed in, take the first data
source$.pipe(first(() = > true/false))
Copy the code
last
Take the first data that meets the condition, and if no condition is passed in, take the last data, and the stream will not fire until it is complete.
source$.pipe(last(() = > true/false))
Copy the code
take
Get the first N and you’re done
source$.pipe(take(N))
Copy the code
takeLast
When the stream is finished, the data will be sent only once the stream is finished.
source$.pipe(takeLast(N))
Copy the code
takeWhile
Pass me the judgment function, and you decide when it ends
source$.pipe(takeWhile(() = > true/false))
Copy the code
takeUntil
Give me A stream (A), and when that stream (A) spits out data, I’m done
source$.pipe(takeUntil(timer(1000)))
Copy the code
skip
Skip the first N data
source$.pipe(skip(N))
Copy the code
skipWhile
Pass me the function. Skip the first few
source$.pipe(skipWhile(() = > true/false))
Copy the code
skipUntil
Give me A stream (A), and when does this stream (A) spit out data and I stop jumping
source$.pipe(skipUntil(timer(1000)))
Copy the code
Convert class operators
map
- Accepts the value passed in from upstream and returns another value to downstream. (It makes no sense if you also return upstream values.)
source$.pipe(map(() = > {}))
Copy the code
mapTo
- Passes the incoming value downstream.
source$.pipe(mapTo('a'))
Copy the code
pluck
- Extract a key from the upstream spit object and pass it downstream.
source$.pipe(pluck('v'))
Copy the code
Lossy backpressure control
If you do not know about anti-shake and throttling, please refer to relevant instructions.
throttle
A stream (A) is passed in, and the upstream data is throttled until the stream (A) discharges the data, and the process is repeated
source$.pipe(throttle(interval(1000)))
Copy the code
throttleTime
Throttle according to time (ms)
source$.pipe(throttleTime(1000))
Copy the code
debounce
A stream (A) is passed in and the upstream data is buffered until the stream (A) spits out the data, and the process is repeated
source$.pipe(debounce(interval(1000)))
Copy the code
debounceTime
Anti-shake according to time (MS)
source$.pipe(debounceTime(1000))
Copy the code
audit
Audit and throttle differ in:
- Throttle: Emits the first data received during throttling
- Audit: Emits the last data received during throttling
source$.pipe(audit(interval(1000)))
Copy the code
auditTime
Same as above, no further elaboration
source$.pipe(auditTime(1000))
Copy the code
sample
-
Normal stream, upstream triggers, downstream receives data.
-
After using SAMPLE, the flow caches the latest data emitted upstream and fetches it from the cache at its own pace.
-
In other words, it doesn’t matter how fast or slow the upstream sends the data. Regardless of sample, he takes the number from the cache at his own pace, and if there is one in the cache, he spits it downstream. If not, don’t do it.
A stream (A) is passed in and the latest data spit out by upstream data is cached until the data is pulled out of the cache by stream (A) and passed down, and the process is repeated
source$.pipe(sample(interval(1000)))
Copy the code
sampleTime
Take the number according to time (ms)
source$.pipe(sampleTime(1000))
Copy the code
distinct
- A DISTINCT prefix indicates a deduplication operation
All elements are de-duplicated, returning data that has never been present in the current stream.
When a function is passed in, a unique key is assigned based on the return value of the function.
source$.pipe(distinct())
Observable.of({ age: 4.name: 'Foo'}).pipe(distinct((p) = > p.name))
Copy the code
distinctUntilChanged
Adjacent elements are deduplicated and only the data that differs from the previous data is returned.
When a function is passed in, a unique key is assigned based on the return value of the function.
source$.pipe(distinctUntilChanged())
Copy the code
distinctUntilKeyChanged
- DistinctUntilChanged is a simplified version of distinctUntilChanged that helps you implement the logic for fetching object keys.
source$.pipe(distinctUntilKeyChanged('id'))
Copy the code
ignoreElements
IgnoreElements ignores all upstream data, and when upstream completes, ignoreElements completes. (I don’t care what you do, just tell me if it’s done.)
source$.pipe(ignoreElements())
Copy the code
elementAt
Only the NTH data emitted by upstream data is retrieved.
The second parameter is equivalent to the default: emits this parameter to the downstream when the upstream ends without sending the NTH data.
source$.pipe(elementAt(4.null))
Copy the code
single
- Check all data upstream, and if only one data meets the criteria, send that data down. Otherwise, an exception is passed down.
source$.pipe(single(() = > true/false))
Copy the code
Lossless back pressure control
- Buffer prefix: Cache the value into an array and spit it out downstream.
- Window prefix: Cache the value into a stream and spit it out downstream.
BufferTime, windowTime
Cache upstream spit data, spit to a specified time, and then repeat.
source$.pipe(bufferTime(1000))
Copy the code
BufferCount, windowCount
Cache upstream spit data, spit to a specified number, and then repeat.
The second parameter is used to control the opening of the cache every few data points, which may be more consistent with our perception.
source$.pipe(bufferCount(10))
Copy the code
BufferWhen, windowWhen
Pass in A factory function that returns flow (A)
The process is as follows:
- When the subscription is triggered, the factory function is called to fetch the stream (A) and start caching
- While waiting for stream (A) to emit data, the cached value is spit down
- Call the factory function again, get A new stream (A), turn on caching, and repeat.
randomSeconds = () = > timer(Math.random() * 10000 | 0)
source$.pipe(bufferWhen(randomSeconds))
Copy the code
BufferToggle, windowToggle
The first argument is to turn on the cache stream (O), and the second argument is to return the factory function that turns off the cache stream (C)
The process is as follows:
- When the open stream (O) spits out data, the factory function is called to get the closed stream (C), starting the cache
- After waiting for the closed stream (C) to spit out the data, the cached value is spit down
- Wait for the open flow (O) to spit out the data, and then repeat Step 1
source$.pipe(bufferToggle(interval(1000), () = > randomSeconds))
Copy the code
Buffer, window
Passing in a closed stream (C) differs from bufferWhen: the stream is passed in, not the factory function that returns the stream.
When the subscription is triggered, the cache is started, and when the closed stream (C) spits out data, the cached value is passed down and the cache restarts.
source$.pipe(buffer(interval(1000)))
Copy the code
The accumulated data
scan
The differences between Scan and Reduce are as follows:
- Reduce: Triggered only after the flow is complete
- Scan: Triggered every time a stream receives data
Unlike other streams, SCAN has the ability to save and remember state.
source$.pipe(scan(() = > {}, 0))
Copy the code
mergeScan
Same as scan, but returns a stream instead of data.
- When upstream spits out data, the protocol function is called to get and subscribe to stream (A), pass the data returned by stream (A) downstream, and cache the last data returned by stream (A). When upstream spits out data again, the last cached data is passed to the protocol function, and the cycle repeats.
source$.pipe(mergeScan(() = > interval(1000)))
Copy the code
Error handling
catch
Capture the error
source$.pipe(catch(err= > of('I'.'II'.'III'.'IV'.'V')))
Copy the code
retry
Pass in the number N, and when an error is encountered, re-subscribe upstream and retry N times.
source$.pipe(retry(3))
Copy the code
retryWhen
Incoming stream (A), subscribes to stream (A) when it encounters an error, and retries each time stream (A) spits out data. The stream is complete, as is retryWfhen.
source$.pipe(retryWhen((err) = > interval(1000)))
Copy the code
finally
source$.pipe(finally())
Copy the code
Multicast operator
multicast
Receives the factory function that returns the Subject and returns a Hot Observable (HO)
When the link starts, the subscriber upstream gets the data, calls the factory function to fetch the Subject, and the data spit out by the upstream is multicast through the Subject.
- The returned HO owns
connect
,refCount
Methods. - call
connect
Before you actually start subscribing to the top stream and sending out data. - call
refCount
Will be based onsubscribe
Automatic quantityconnect
andunsubscribe
Operation. - The boss of the multicast operator, relatively low-level design, daily use is not much.
- Subsequent multicast operators are implemented based on this operator.
source$.pipe(multicast(() = > new Subject()))
Copy the code
publish
- Encapsulates operations that the multicast operator needs to pass into the Subject factory function, otherwise consistent.
source$.pipe(publish())
Copy the code
share
Publish based encapsulation returns the result of calling refCount (see code)
source$.pipe(share())
/ / is equivalent to
source$.pipe(publish().refCount())
Copy the code
publishLast
When the upstream is complete, multicast the last data upstream and complete the current stream.
source$.pipe(publishLast())
Copy the code
publishReplay
Pass in the cache number N, cache the latest N data upstream, and spit out the cache when there is a new subscription.
- Upstream will only be subscribed once.
source$.pipe(publishReplay(1))
Copy the code
publishBehavior
Caches the latest data spit out upstream and spit out the latest value when there is a new subscription. If upstream never spit out data when subscribed, the default value passed in is spit out.
source$.pipe(publishBehavior(0))
Copy the code
Higher-order merge class operator
- Higher-order operators are not higher-order operators
- When a stream spits out a stream instead of data, it is a higher-order stream, just as a function is called a higher-order function if its return value is still a function
- Higher-order operators are operators that operate on higher-order streams
In the following code example, the top stream spits out not ordinary data, but two streams that produce data. Then, when receiving the top stream, the downstream needs to subscribe to the upstream stream to obtain data, as follows:
of(of(1, 2, 3), of(4, 5, 6))
.subscribe(
ob => ob.subscribe((num) => {
console.log(num)
})
)
Copy the code
The above code simply takes the data out of the stream. What if I want to use the previous operator on the spit stream?
cache = [] of(of(1, 2, 3), of(4, 5, 6)) .subscribe({ next: ob => cache.push(ob), complete: { concat(... cache).subscribe(console.log) zip(... cache).subscribe(console.log) } })Copy the code
Regardless of whether the above implementation is reasonable, we can already use operators on upstream streams, but it is too cumbersome to implement, so Rxjs encapsulates the relevant operators for us to implement the above functionality.
To summarize: higher-order operators operate on streams, while normal operators operate on data.
concatAll
Corresponding to concat, cache each stream emitted by higher-order streams, subscribe in turn, and when all streams are complete, concatAll is completed.
source$.pipe(concatAll())
Copy the code
mergeAll
Corresponding to merge, subscribe to every flow that higher-order flows spit out. Any flow spit out data, and mergeAll spit out data accordingly.
source$.pipe(mergeAll())
Copy the code
zipAll
Corresponding to the ZIP, subscribe to each stream ejected by the higher-order stream, and merge the data of the same index that these streams ejected downward.
source$.pipe(zipAll())
Copy the code
combineAll
Corresponding to combineLatest, subscribe to every stream that comes out of the higher-order stream, and merge the last value of all streams and pass it down.
source$.pipe(combineAll())
Copy the code
Higher-order toggle class operators
switch
Switch flow – Love the new and hate the old
Each time a higher-order stream spits out, it unsubscribes to the previous stream and subscribes to the latest stream.
source$.pipe(switch())
Copy the code
exhaust
Switch streams – Stay together
Subscribe when a higher-order stream spits out a stream. Ignore all flows emitted by higher order flows during this period until the flow is complete. When the stream is complete, wait to subscribe to the next stream that the higher-order stream spits out, repeat.
source$.pipe(exhaust())
Copy the code
Higher-order Map operators
After you look at the examples, you get the definition.
example
Achieve the following functions:
mousedown
After the event is triggered, listenmousemove
The event
Common implementation
mousedown$ = formEvent(document.'mousedown')
mousemove$ = formEvent(document.'mousemove')
mousedown$.pipe(
map(() = > mousemove$),
mergeAll()
)
Copy the code
- when
mousedown
After the event is triggered, usemap
Operator to convert downvomited data tomousemove
The flow of events. - It is required because it returns a stream rather than data
mergeAll
The operator helps us expand the data in the stream. - So what we end up with is
mousemove
theevent
Event object.
Note: Since there is only one event flow, using any of the higher-order merge operators described above has the same effect.
High-level Map implementation
mousedown$.pipe(
mergeMap(() = > mousemove$)
)
Copy the code
It is easy to see that the so-called high-level map is
concatMap = map + concatAll
mergeMap = map + mergeAll
switchMap = map + switch
exhaustMap = map + exhaust
concatMapTo = mapTo + concatAll
mergeMapTo = mapTo + mergeAll
switchMapTo = mapTo + switch
Copy the code
expand
Similar to mergeMap, however, all data that is passed downstream is also passed to itself, so expand is a recursive operator.
source$.pipe(expand(x= > x === 8 ? EMPTY : x * 2))
Copy the code
The data packet
groupBy
The output stream, which classifies the data passed in upstream by key value, creates a stream for each category and passes it downstream.
The key value is controlled by the first function argument.
source$.pipe(groupBy(i= > i % 2))
Copy the code
partition
A simplified version of groupBy, passing judgment criteria, and putting those that meet the criteria into the first stream, and those that don’t meet the criteria into the second stream.
Simple said.
- GroupBy may pass N streams down, depending on the key classification.
- Partition passes down only two streams: those that meet the condition and those that do not.
source$.pipe(partition())
Copy the code
conclusion
The above is the content of this article, I hope you will have a harvest.
If you don’t understand something, you can raise it in the comments section so we can grow together.
I wish you all the best to take down Rxjs as soon as possible.
The resources
- The official documentation
- Rxjs in Plain English