preface
Curious about the RxJS technology, THE author spent a few days researching this technology, and liver a bag of wolfberry to complete this article, it is not easy. But also it is through this period of learning, I found that the technology to a certain extent, can solve some pain points, and I met in daily business, and have a kind of want to immediately to your new project of desire, and indeed this to control by the concept of data streams of data in a large project brings a kind of very elegant programming experience.
If read feel have harvest, hope to give the author a praise 😘
concept
RxJS is the abbreviation of Reactive Extensions for JavaScript. Derived from Reactive Extensions, RxJS is an asynchronous programming application library based on observable data streams combined with observer and iterator patterns. RxJS is an implementation of Reactive Extensions in JavaScript.
Attention! It has nothing to do with React, and I initially thought it was an acronym for React.js.
When it comes to unfamiliar technology, the usual way to think about it is to go to Google, search, and then look at official documents or scattered blogs to find information that makes sense of the technology. But in many cases, it is difficult to really understand the context of a technology from just a few words.
This article will analyze the value of this technology from a learning perspective and the benefits it can bring to our existing projects.
background
From a developer’s point of view, for any technology, we often talk about the following:
- Application Scenario?
- How to land?
- How easy is it to get started?
- Why is it needed? What problem does it solve?
In view of the above problems, we can analyze the related concepts of RxJS from the shallow to the deep.
Application Scenario?
Suppose we have a requirement like this:
After we upload a large file, we need to monitor its progress in real time, and stop listening when the progress reaches 100.
For general practices we can adopt the way of short polling, in the packaging to the asynchronous request, if we adopt the way of Promise, so we can use common practice to write one for polling method, to obtain the return value, if the schedule does not complete a certain time delay call this method again, Errors also need to be caught and handled when they occur.
Obviously, this approach is to a certain extent, cause the development and maintenance costs for developers, because this process is more like we are observing an event, the event will be triggered many times and let I perceive, moreover also have the ability to unsubscribe, Promise way actually not friendly when dealing with this kind of thing, RxJS’s management of asynchronous data flows is more consistent with this paradigm.
To quote especially large words:
My personal preference is to use Rx where Rx is appropriate, but not for everything. A good example would be a multi-server real-time message flow, which is processed at a high level through Rx, and then the View layer is a clear Observable, but the View layer itself can still handle user events using the existing paradigm.
How to land?
For the existing project, how to combine with the actual and ensure the stability of the original project is indeed our priority. After all, if any technology cannot be put into practice, it will inevitably bring us limited benefits.
If you’re an Angular developer, you probably know that Angular is deeply integrated with Rxjs, and that whenever you use the Angular framework, you will inevitably be exposed to Rxjs.
In some cases where more precise control of events is required, for example we want to listen for a click event but stop listening after three clicks.
It is very convenient and effective to introduce RxJS for functional development at this time, so that we can save on listening to events and recording the state of clicks, as well as dealing with some of the psychological burden of canceling the listening logic.
You can also choose to introduce RxJS to your larger projects for unified management of data flows, but don’t impose it on scenarios that don’t fit the RxJS concept, where the actual effect may not be obvious.
How easy is it to get started?
If you are an experienced JavaScript developer, you may be able to apply RxJS to some simple practices in a few minutes.
Why is it needed? What problem does it solve?
If you’re a developer using JavaScript, is it easy to write code that is very inefficient, or bloated with judgment and a lot of dirty logic when dealing with lots of events and complex data parsing and conversion?
In addition, in the JavaScript world, the word “trouble” seems to be often mentioned in the context of handling asynchronous events. We can first savor the value of RxJS from the history of handling asynchronous events.
The callback era
Usage scenario:
- Event callback
Ajax
requestNode API
setTimeout
,setInterval
And async event callback
In the above scenario, we started by passing in a callback function at the time of the function call, which was executed after the synchronous or asynchronous event had completed. We can say that in most simple scenarios, using the callback method is undoubtedly very convenient, such as we are familiar with several higher-order functions:
forEach
map
filter
[1.2.3].forEach(function (item, index) {
console.log(item, index);
})
Copy the code
The way they are used only requires us to pass in a callback function to complete batch processing of a set of data, very convenient and clear.
However, in some complex business processing, if we still adhere to the idea of not abandoning the stubborn way of using callback functions, the following situation may occur:
fs.readFile('a.txt'.'utf-8'.function(err, data) {
fs.readFile('b.txt'.'utf-8'.function(err, data1) {
fs.readFile('c.txt'.'utf-8'.function(err, data2) {
/ /...})})})Copy the code
Of course, as a writer, you may think this is very clear, there is nothing wrong with it. But what if it’s a little bit more complicated, what if all the functions that are called are different, what if the contents of each callback are very complex. In the short term, you may know why you wrote it and why you wrote it, but after a month, three months, a year, are you sure you’ll be able to find your original heart in a lot of business code?
You can’t wait to find submit records, this is which han Batch write, with shit…… Oh, my God, I wrote that.
At this time, in the face of a lot of developers suffering back to the region, finally someone out to benefit mankind……
Promise time
Promises were first proposed by the community (we’re not going to be able to resist callbacks for those of us who deal with weird business code every day), but they were officially added to the language standard in ES6, and the uniform specification allowed us to make new promises natively.
On the plus side, Promises offer a different coding approach than callbacks, with chained calls that throw data back one layer at a time, and the ability to consistently catch exceptions, rather than simply using callbacks and having to try and catch through multiple codes.
No more words, look at the size!
function readData(filePath) {
return new Promise((resolve, reject) = > {
fs.readFile(filePath, 'utf-8'.(err, data) = > {
if(err) reject(err); resolve(data); })}); } readData('a.txt').then(res= > {
return readData('b.txt');
}).then(res= > {
return readData('c.txt');
}).then(res= > {
return readData('d.txt');
}).catch(err= > {
console.log(err);
})
Copy the code
By contrast, this writing method will not be more in line with our normal thinking logic, this order, people look very comfortable, but also more conducive to the maintenance of the code.
Advantages:
- If you change your state, you’re not going to change it again, you’re going to get the same result all the time
- The asynchronous event processing process, writing more convenient
Disadvantages:
- Can’t cancel
- Error cannot be
try catch
(But it can be used.catch
Way) - When in
pending
You can’t tell what stage you’re in
Although the emergence of Promise improves the efficiency of handling asynchronous events to some extent, we often have to face some unfriendly code migration when we need to mix with some synchronous events. We need to move the code from the outer layer to the inside of the Promise to ensure that an asynchronous event completes before continuing execution.
The Generator function
The introduction of Generator functions in ES6 provides a solution for asynchronous programming by allowing the yield keyword to suspend the execution flow of functions. Formally, it is also an ordinary function, but has several notable features:
function
There is an asterisk “*” between the keyword and the function name (next to each other is recommendedfunction
Keywords)- Function body use
Yield · expressions that define different internal states (there can be more than one)
Yield `) - Direct call
Generator
The function does not execute and does not return the result. Instead, it returns a traverser object (Iterator Object
) - Which, in turn, calls the traverser objects
next
Method, traversalGenerator
Every state inside the function
function* read(){
let a= yield '666';
console.log(a);
let b = yield 'ass';
console.log(b);
return 2
}
let it = read();
console.log(it.next()); // { value:'666',done:false }
console.log(it.next()); // { value:'ass',done:false }
console.log(it.next()); // { value:2,done:true }
console.log(it.next()); // { value: undefined, done: true }
Copy the code
In this mode, we can freely control the execution mechanism of functions and let the functions be executed when needed. However, for daily projects, this writing method is not friendly enough to give users the most intuitive feelings.
async / await
I believe that after a lot of interview questions, you more or less should also know that this thing is actually a syntax sugar, which is the combination of Generator function and automatic actuator CO, so that we can write asynchronous code in a synchronous way, very carefree.
One way or another, it works so well that if it weren’t for compatibility, you’d want to use it on a large scale.
Let’s take a look at how happy the code is:
async readFileData() {
const data = await Promise.all([
'Asynchronous Event 1'.'Asynchronous Event 2'.'Asynchronous event three'
]);
console.log(data);
}
Copy the code
Just write it as if it were synchronous, and don’t worry about copying and pasting a bunch of code inside an other asynchronous function.
RxJS
It is similar to Promise in the way of use, but it is much more powerful than Promise in the ability. It can not only control data in the form of stream, but also has many built-in tools and methods so that we can easily deal with various data level operations, making our code as smooth as silk.
Advantage:
- Significant reduction in code volume
- Improved code readability
- Good for handling asynchrony
- Event management, scheduling engine
- Very rich operator
- Declarative programming style
function readData(filePath) {
return new Observable((observer) = > {
fs.readFile(filePath, 'utf-8'.(err, data) = > {
if(err) observer.error(err); observer.next(data); })}); } Rx.Observable .forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt'))
.subscribe(data= > console.log(data));
Copy the code
This is just the tip of the iceberg of the energy that RxJS can express. There are many ways to approach this scene. RxJS is good at handling asynchronous data streams and is rich in library functions. For RxJS, it can convert arbitrary Dom events, or promises, into Observables.
Advance knowledge
Before entering the world of RxJS, we first need to clarify and understand several concepts:
- Responsive programming (
Reactive Programming
) - Flow (
Stream
) - Observer mode
- Iterator pattern
Reactive Programming
Reactive Programming, which is an event-based model. In the asynchronous programming pattern above, we described two ways to get the results of the previous task execution. One is Proactive rotation, which we call Proactive. The other one is Reactive. We call it Reactive. In Reactive mode, the feedback of the result of the previous task is an event whose arrival will trigger the execution of the next task.
The idea behind reactive programming is this: you can create a Data stream (also known as a “stream”, more on this in a later section) from anything including Click and Hover events. Streams are cheap and common. Anything can be a Stream: variables, user input, properties, Cache, data structures, and so on. For example, imagine that your Twitter feed is like a Data Stream like Click Events. You can listen to it and respond accordingly.
Combined with the actual, if you use a Vue, inevitably can immediately think of, the design concept of Vue Fan Shime, not also is a kind of reactive programming in the process of writing code, we only need to pay attention to the change of the data, don’t need to manual operation to view change, the Dom layer will change with the change of the relevant data and automatically change and to render.
Flow (Stream
)
Flow as a concept should be language independent. The concept of a stream can be seen in file I/O streams, standard INPUT/output streams for Unix systems, standard error streams (STdin, STdout, stderr), as well as TCP streams mentioned earlier, as well as in the abstraction of HTTP request/response flows by some Web backend technologies such as Nodejs.
At the heart of reactive programming, the essence of a flow is a chronologically sequential set of ongoing events.
For first-class or multiple streams, we can transform them, merge them, etc., to create a new stream. During this process, the stream is immutable, that is, only a new stream is returned.
Observer mode
Among many design patterns, the observer pattern can be said to have obvious effects in many scenarios.
The Observer pattern is a behavioral design pattern that allows you to define a subscription mechanism to notify multiple other objects that “observe” an object when an event occurs on that object.
With the actual examples to understand, just like you booked a bank card balance SMS notification service change, so this time, every time you transfer or buy goods after using this piece of bank card consumption, the banking system will give you push a text message, inform you of how much consumption how many money, this is actually a kind of observer pattern.
In this process, the bank card balance is the object being observed, and the user is the observer.
Advantages:
- The coupling relationship between the object and the observer is abstract.
- Conforms to the principle of dependence inversion.
- A triggering mechanism is established between the target and the observer.
- Support for broadcast communications
Inadequate:
- The dependency between the target and the observer is not completely broken, and circular references are possible.
- When there are many observer objects, the release of notifications takes a lot of time and affects the efficiency of the program.
Iterator pattern
The Iterator pattern is also called the Sursor pattern. In object-oriented programming, the Iterator pattern is a design pattern. It is one of the simplest and most common design patterns. The iterator pattern separates the process of iteration from the business logic by allowing users to traverse each element of a container through a specific interface without knowing the underlying implementation.
const iterable = [1.2.3];
const iterator = iterable[Symbol.iterator]();
iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}
iterator.next(); // => { value: "3", done: false}
iterator.next(); // => { value: undefined, done: true}
Copy the code
As front-end developers, the most common data structures we encounter with an iterator interface are: Map, Set, Array, array-like, etc. When we use them, we can all use the same interface to access each element, which is the iterator pattern.
The Iterator function:
- Provide a unified and simple access interface for various data structures;
- Enable the members of a data structure to be arranged in a certain order;
- For new traversal syntax
for... of
Implement loop traversal
In many articles, one would like to mix iterator and eraser for conceptual resolution, when the meaning is the same, or so to speak (iterator equals eraser).
Observable
Represents a concept that is a callable collection of future values or events. It can be subscribed to by multiple observers, each of which is independent of the other.
Here’s an example:
Suppose you subscribe to a blog or a service number that pushes articles (wechat official account, etc.), and then whenever the official account updates new content, the official account will push new articles to you. In this relationship, the official account is an Observable, which is used to generate data data source.
Now that you have a good idea of what an Observable is, let’s look at how to create an Observable in RxJS.
const Rx = require('rxjs/Rx')
const myObservable = Rx.Observable.create(observer= > {
observer.next('foo');
setTimeout(() = > observer.next('bar'), 1000);
});
Copy the code
We can create an Observable by calling the Observable.create method. This method takes a function, called the producer function, that generates the Observable value. This function takes an observer, and calls observer.next() inside the function generate an Observable with a series of values.
Regardless of what an observer is, creating an Observable is simply a matter of calling an API, and a simple Observable is created.
Observer
A collection of callback functions that know how to listen for the value provided by an Observable. An Observer plays the role of a sentinel in a signal flow. It is responsible for observing the state of task execution and transmitting signals into the flow.
Here we simply implement the internal structure:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')}}Copy the code
In RxJS, the Observer is optional. In the absence of parts of the next, Error, and complete processing logic, the Observable still works, and the processing logic for the particular notification type contained is automatically ignored.
For example, we can define it like this:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
}
}
Copy the code
It still works.
So how does it fit in with what we use in actual combat:
const myObservable = Rx.Observable.create((observer) = > {
observer.next('111')
setTimeout(() = > {
observer.next('777')},3000)
})
myObservable.subscribe((text) = > console.log(text));
Copy the code
Here we use the subscribe method to make an Observer subscribe to an Observable. We can take a look at the subscribe function definition to see how it works:
subscribe(next? :(value: T) = > void, error? :(error: any) = > void, complete? :() = > void): Subscription;
Copy the code
Source code is written in TS, the code is the document, very clear, here the author to you to interpret, we from the view of the input, from left to right is next, error, complete, and is optional, we can choose to pass in the relevant callback, This is where we see that next, Error, and complete can still work without part of the logic, because they are all optional.
The Subscription and the Subject
Subscription
Subscription is an Observable execution that can be cleaned up. The most common method of this object is the unsubscribe method, which does not take any arguments and is used to clear the resources occupied by the Subscription. It also has an add method that allows us to unsubscribe from multiple subscriptions.
const myObservable = Rx.Observable.create(observer= > {
observer.next('foo');
setTimeout(() = > observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x= > console.log(x));
/ / later:
// This will cancel the Observable execution in progress
Observable execution is initiated by calling the SUBSCRIBE method using the observer
subscription.unsubscribe();
Copy the code
Subject = Subject
It is a proxy object that is both an Observable and an Observer. It can receive data from both an Observable and an Observer that subscribes to it. At the same time, Subject will multicast the internal Observers list.
Subjects are the only way to share any Observable execution with multiple observers
At this point eagle-eyed readers will notice that a new concept has come into being — multicasting.
- What about multicast?
- Is there unicast with multicasting?
- What’s the difference?
Let me give you a good analysis of these two concepts.
unicast
Ordinary Observables are unicast, so what is unicast?
Unicast means that each ordinary Observables instance can only be subscribed to by one observer, and when it is subscribed to by another observer, a new instance is created. In other words, when an ordinary Observables is subscribed to by different observers, there will be multiple instances. No matter when the observer started subscribing, each instance sends the value to the corresponding observer from the beginning.
const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);
source.subscribe((value) = > console.log('A ' + value))
setTimeout(() = > {
source.subscribe((value) = > console.log('B ' + value))
}, 1000)
// A 0
// A 1
// B 0
// A 2
// B 1
// B 2
Copy the code
The source is an Observable that sends an integer increment from 0 every second and only three times. (The take operator is an Observable that stops sending data when it takes an integer.) .
From this we can see that two different observers subscribe to the same source, one directly subscribing, and the other subscribing after a delay of one second.
From the print result, A prints an increasing number every second from 0, while B delays by one second and then prints again from 0. Thus, the execution of A and B is completely separate, that is, each subscription creates A new instance.
In many scenarios, we might want B to be able to accept not the data initially, but the data that is currently being sent at the moment of subscription, where multicast capability is needed.
multicast
What about the ability to multicast, where we only receive real-time data whenever we subscribe.
Maybe at this time, A partner will jump out, directly to A middleman to subscribe to the source, and then forward the data to A and B?
const source = Rx.Observable.interval(1000).take(3);
const subject = {
observers: [].subscribe(target) {
this.observers.push(target);
},
next: function(value) {
this.observers.forEach((next) = > next(value))
}
}
source.subscribe(subject);
subject.subscribe((value) = > console.log('A ' + value))
setTimeout(() = > {
subject.subscribe((value) = > console.log('B ' + value))
}, 1000)
// A 0
// A 1
// B 1
// A 2
// B 2
Copy the code
The only change is that the object that A and B subscribe to is changed from source to subject. Then look at what the subject contains. This simplifies things A bit by removing the error and complete functions and keeping only next. It then contains a built-in Observer array, which includes all subscribers, and exposes a subscribe for observers to subscribe to.
In use, having the middleman subject subscribe to the Source ensures unified management and real-time data, since there is essentially only one subscriber to the source.
The middle man can be directly replaced by the RxJS Subject class instance. The effect is the same
const source = Rx.Observable.interval(1000).take(3);
const subject = new Rx.Subject();
source.subscribe(subject);
subject.subscribe((value) = > console.log('A ' + value))
setTimeout(() = > {
subject.subscribe((value) = > console.log('B ' + value))
}, 1000)
Copy the code
Similarly, let’s first see whether the printed result meets the expectation. First, the printed result of A does not change, and the number printed for the first time of B now starts from 1, which is the data currently being transmitted. This satisfies our need to obtain real-time data.
Unlike unicast subscribers who always need to get data from scratch, multicast mode ensures real-time data.
In addition to the above, RxJS provides three variants of Subject:
BehaviorSubject
ReplaySubject
AsyncSubject
BehaviorSubject
The BehaviorSubject is a Subject that emits an extra value at the last emitted value when a new subscription is available.
]
Also we combine reality to understand, suppose you have we need to use it to maintain a state, after it changes to all subscription can send a current state of the data, it’s like we’re going to implement a computational attributes, we are only concerned with the calculation of properties of the final state, and don’t care about the process of the number of change, and what to do then?
We know that the normal Subject will only send the current data when there is new data, but will not send the sent data after the sending, so at this point we can introduce the BehaviorSubject for terminal maintenance. Because the observer who subscribes to the object receives the last value sent by the object while subscribing, this satisfies our requirement above.
We then analyze this Subject application scenario in combination with the code:
const subject = new Rx.Subject();
subject.subscribe((value) = > console.log('A:' + value))
subject.next(1);
/ / A: 1
subject.next(2);
/ / A: 2
setTimeout(() = > {
subject.subscribe((value) = > console.log('B:' + value)); // Failed to receive the value after 1s
}, 1000)
Copy the code
First demo is using common Subject to as subscription object, then the observer is A Subject in the instance objects call next subscription before send A new value, then the observer is A second delay after subscription, so A normal receiving data, so this time due to B at the time of sending the data haven’t subscribe, so it didn’t receive the data.
So let’s look at the BehaviorSubject implementation again:
const subject = new Rx.BehaviorSubject(0); // You need to pass in the initial value
subject.subscribe((value: number) = > console.log('A:' + value))
/ / A: 0
subject.next(1);
/ / A: 1
subject.next(2);
/ / A: 2
setTimeout(() = > {
subject.subscribe((value: number) = > console.log('B:' + value))
/ / B: 2
}, 1000)
Copy the code
Also from the printed results, the difference between the source object and the ordinary Subject is that the source object sends the most recently changed value (or the initial value if there is no change) while subscribing. At this time, our B also gets the latest state as expected.
Here we need to pass in an initial value when instantiating the BehaviorSubject.
ReplaySubject
Once you understand the BehaviorSubject, it’s easy to understand the ReplaySubject. ReplaySubject saves all the values and then plays them back to the new subscriber, and it provides input parameters to control the number of replay values (default replay is all).
]
What? Don’t get it? See the code:
const subject = new Rx.ReplaySubject(2);
subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe((value: number) = > console.log('A:' + value))
/ / A: 1
/ / A: 2
subject.next(3);
/ / A: 3
subject.next(4);
/ / A: 4
setTimeout(() = > {
subject.subscribe((value: number) = > console.log('B:' + value))
/ / B: 3
/ / B: 4
}, 1000)
// The overall print order:
/ / A: 1
/ / A: 2
/ / A: 3
/ / A: 4
/ / B: 3
/ / B: 4
Copy the code
From the constructor argument, the BehaviorSubject and ReplaySubject both need to pass a parameter, which for the BehaviorSubject is the initial value, and for the ReplaySubject is the number of previous replays. If we don’t pass the number of replays, Then it will replay all the values that have been emitted.
As a result, if you do not pass in a definite number of replays, the effect is almost the same as the unicast effect described earlier.
So we can reanalyze the code to see how many times observers receive the value before the source object is sent at the moment of subscription.
AsyncSubject
The AsyncSubject only sends the last value of the Observable execution to the Observer when it completes (executing complete()). If the AsyncSubject terminates due to an exception, it does not release any data, but it does pass an exception notification to the Observer.
]
AsyncSubject is generally used less, but more often than not, the first three are used.
const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res= > {
console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res= > {
console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res= > {
console.log('C:' + res);
});
subject.complete();
subject.next(4);
// Print the whole result:
// A:3
// B:3
// C:3
Copy the code
The source object will return the last data to all observers only after all data has been sent, i.e. after calling the complete method.
This is like the novel often some, when you want to put skills, first to play a set of hand style, after the game will release your big move.
Cold – Observables and Hot – Observables
Cold Observables
Cold Observables start producing values only when they are obsered by Observers. It is unicast and generates as many subscription instances as there are subscriptions. Each subscription receives values starting with the first generated value, so each subscription receives the same value.
If you want to refer to the code related to the Cold Observables, just look at the previous unicast example.
As with the ability of unicast description, the source object sends all the numbers from the initial value to the observer whenever the observer starts to subscribe.
Hot Observables
Hot Observables generates a value whether or not it is subscribed. Is multicast, in which multiple subscriptions share the same instance and receive values at the beginning of the subscription. Each subscription receives different values depending on when the subscription was started.
Here are several scenarios that we can analyze one by one to help understand:
“Heat”
You can first ignore unfamiliar functions in your code, which will be explained later.
const source = Rx.Observable.of(1.2).publish();
source.connect();
source.subscribe((value) = > console.log('A:' + value));
setTimeout(() = > {
source.subscribe((value) = > console.log('B:' + value));
}, 1000);
Copy the code
An Observable is created using the Rx operator of, followed by a publish function. Connect is called to start sending data.
Final code execution result is without any print data, analyze the reasons and are better understood, because haven’t subscribe to the open data, and it is A Hot Observables, it won’t notice if you have any subscription, open after can send data directly, so A and B are not to receive data.
Of course, if you put the connect method last, the final result is that A receives it, but B still can’t, because A subscribed before opening the data, and B has to wait one second.
A more intuitive scenario
As described above in multicast, in fact, what we want to see more is that both observers A and B can receive the data, and then observe the difference of the data, so that it is easy to understand.
Here we directly change to another source:
const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) = > console.log('A:' + value));
setTimeout(() = > {
source.subscribe((value: number) = > console.log('B:' + value));
}, 3000);
source.connect();
/ / A: 0
/ / A: 1
/ / A: 2
/ / B: 2
Copy the code
In this case, we use interval together with the take operator to emit an increasing number every second, up to three, and then the print result is clearer. A normally receives three numbers, and B receives only the last number, 2, after three seconds. This method is the same as described above in multicast.
Both comparisons
Cold Observables
For example, if we go to website B to watch an episode with new updates, whenever we go to watch it, we can watch the whole episode from the beginning, regardless of whether other people watch it or not.Hot Observables
: This is just like when we go to B station to watch a live broadcast, it starts to play directly after the live broadcast, regardless of whether or not there are subscribers. That is to say, if you don’t subscribe to it at the beginning, then you don’t know the content of the previous live broadcast when you go back to watch it after a while.
Operator parsing that appears in the above code
When creating the Hot Observables we used a combination of the publish and connect functions. The result of calling the Publish operator is a ConnectableObservable, The connect method is then provided on the object to allow us to control when the data is sent.
-
Publish: This operator converts normal Cold Observables into ConnectableObservables.
-
ConnectableObservable: A multicasting shared Observable that is available for concurrent subscription by multiple Observers, ConnectableObservable is a Hot Observables. ConnectableObservable is the middleman between the subscriber and the actual source Observables (the interval in the example above, which sends a value every second, which is the source Observables), The ConnectableObservable receives the value from the source Observables and forwards it to the subscriber.
-
Connect () : A ConnectableObservable does not actively send values. It has a connect method. By calling connect, you can start a shared ConnectableObservable to send values. When we call ConnectableObservable. Prototype. The connect method, regardless of being subscription, will send the value. Subscribers share the same instance, and the value that subscribers receive depends on when they start their subscription.
In fact, this manual control is quite troublesome, is there a more convenient way to operate, such as listening to a subscriber to start sending data, once all subscribers have cancelled, stop sending data? Well, there is. Let’s look at the reference count:
Reference counting
Publish and refCount are used to achieve an automatic effect.
const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() = > {
source.subscribe(data= > { console.log("A:" + data) });
setTimeout(() = > {
source.subscribe(data= > { console.log("B:" + data) });
}, 1000);
}, 2000);
/ / A: 0
/ / A: 1
/ / B: 1
/ / A: 2
/ / B: 2
Copy the code
By looking at the essence of the results, we can easily find that only when A subscribes, it starts to send data (the data A gets starts from 0), and when B subscribes, it can only get the data currently sent, but not the previous data.
What’s more, the automatic transmission stops sending data when all subscribers unsubscribe.
Schedulers
Used to control concurrency and is a centralized dispatcher, allowing us to coordinate when calculations occur, such as setTimeout or requestAnimationFrame or others.
- A scheduler is a data structure. It knows how to store and sort tasks by priority or other criteria.
- The scheduler is the execution context. It indicates when and where the task is executed (for example, immediately, or another callback function mechanism (for example
setTimeout
或process.nextTick
), or animation frames). - The scheduler has a (virtual) clock. The scheduler functions through its
getter
methodsnow()
Provides the concept of “time”. Tasks scheduled on a specific scheduler will follow exactly the time indicated by that clock.
Learn this believe that you have more or less to RxJS have a certain understanding, I do not know that you have found a question, the code examples shown in the previous have synchronous and asynchronous, and the author did not show the control of their execution, their execution mechanism in the end is what?
In fact, their internal scheduling relies on Schedulers to control when data is sent. Many operators preset different Schedulers, so we don’t need to handle them in a special way to run both synchronously and asynchronously.
const source = Rx.Observable.create(function (observer: any) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('Before subscription');
source.observeOn(Rx.Scheduler.async) / / set to async
.subscribe({
next: (value) = > { console.log(value); },
error: (err) = > { console.log('Error: ' + err); },
complete: () = > { console.log('complete'); }});console.log('After subscription');
/ / subscription
/ / subscription
/ / 1
/ / 2
/ / 3
// complete
Copy the code
From the point of view of the print results, the data transmission time has indeed changed from synchronous to asynchronous. If the scheduling mode is not changed, the “subscribed” printing should be performed after the data is sent.
After looking at the example, let’s look at what kinds of scheduling this scheduler can do:
queue
asap
async
animationFrame
queue
Place each next task in the queue instead of executing it immediately
Queue latency behaves the same as the async scheduler when it uses the scheduler.
When no delay is used, it synchronously schedules a given task – executes as soon as the task is scheduled. However, when called recursively (that is, within a scheduled task), the queue scheduler is used to schedule another task, and instead of executing immediately, the task is put on the queue and waits for the current task to complete.
This means that when you execute a task using the Queue scheduler, you are certain that it will end before any other tasks scheduled by that scheduler begin.
This synchronization and our common understanding of synchronization may not be the same, the author was also confused for a while.
Let’s use an official example to explain how this type of scheduling is understood:
import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() = > {
queueScheduler.schedule(() = > console.log('second'));
console.log('first');
});
Copy the code
Instead of focusing on unfamiliar function calls, we’ll focus on the differences between this type of scheduling and normal synchronous scheduling.
First we call queueScheduler’s schedule method to start execution, and then we call it again inside the function in the same way (again, this could be a recursion, but it might be better to use this example here) and pass in a function that prints second.
Then look at the following statement, a normal console.log(‘first’), and let’s look at the print:
// first
// second
Copy the code
If you don’t see why, you can go back and see how the queue handles recursion. If it is called recursively, it maintains an internal queue and waits for the first queued task to complete (i.e. Console.log (‘first’) above before executing console.log(‘second’)). Because console.log(‘second’) is a task that was added to the queue later.
asap
The internal promise-based implementation (process.nextTick on the Node side) uses the fastest asynchronous transport mechanism available, NextTick or the Web Worker’s MessageChannel may also call setTimeout if the Promise or process.nextTick is not supported.
async
Much like ASAP, except that setInterval is used internally for scheduling, mostly for time-based operators.
animationFrame
From name actually believe that everyone has to know a little, internal to achieve scheduling based on the requestAnimationFrame, so the timing of the execution with the window. The requestAnimationFrame consistent, suitable for frequently apply colours to a drawing or operating animation scene.
Operators
Operator concept
Use pure functions in a functional programming style, using operators such as map, filter, concat, flatMap, and so on to process collections. Because of its pure function definition, we know that calling any operator does not change the existing Observable instance, but instead returns a new Observable.
Although RxJS is based on An Observable, its operators are the most useful. Operators are the basic units of code that allow complex asynchronous code to be easily combined in a declarative manner.
Implementing an Operator
Assuming we don’t use the filter operators provided by RxJS, what if we let you implement them yourself?
function filter(source, callback) {
return Rx.Observable.create(((observer) = > {
source.subscribe(
(v) = > callback(v) && observer.next(v),
(err) = > observer.error(err),
(complete) = >observer.complete(complete) ); }}))const source = Rx.Observable.interval(1000).take(3);
filter(source, (value) = > value < 2).subscribe((value) = > console.log(value));
/ / 0
/ / 1
Copy the code
This implements a simple filter operator. The main idea is to return a new Observable based on the incoming Observable.
Code first creates a observables, with a new observer then subscribe to the incoming source, and call the callback function to judge whether the value need to continue, and if it is false, skip, according to our incoming source and filter function, the source object will eventually sent three Numbers 0, 1, 2, print the result of 0, 1, 2 is filtered.
Of course we can also place it on rX.Observable. Prototype so that we can get the source using this:
Rx.Observable.prototype.filter = function (callback) {
return Rx.Observable.create(((observer) = > {
this.subscribe(
(v) = > callback(v) && observer.next(v),
(err) = > observer.error(err),
(complete) = > observer.complete(complete)
);
}))
}
Rx.Observable.interval(1000).take(3).filter((value) = > value < 2).subscribe((value) = > console.log(value));
/ / 0
/ / 1
Copy the code
This would not be more concise, just as we used the filter method of the native array.
The difference between the two approaches is easy to understand: one is in Prototype and can be called directly by the instantiated object, while the other defines a new function that can be exported to the caller (or mounted directly to the static property of the Observable).
Readers may have guessed what I’m going to talk about next.
Instance operators – Static operators
- Instance operator: Usually an operator that can be called directly by an instantiated object. We tend to use the instance operators a little bit more, for example
filter
,map
,concat
And so on. The example operator can be used more happilythis
, while eliminating one argument and maintaining the chained call. - Static operators:
Observable
Is aclass
Class, we can mount the operator directly onto its static property. The advantage is that it can be called without instantiation, but the disadvantage is that it can no longer be usedthis
Instead, you need to pass in the target object.
If you add an instantiation property, we already have an example above, so I won’t go into that here.
Modify the filter example above by mounting it to a static property:
Rx.Observable.filter = (source, callback) = > {
return Rx.Observable.create(((observer) = > {
source.subscribe(
(v) = > callback(v) && observer.next(v),
(err) = > observer.error(err),
(complete) = >observer.complete(complete) ); }}))Copy the code
Creational Operators
The first thing we need to focus on when dealing with or using any data is where it came from, how it came from, and how we get it.
create
Definition:
public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable
After the baptism of the previous code, I believe you are not unfamiliar with the operator.
Create converts the onSubscription function to an actual Observable. The onSubscription function takes an Observer instance as the only parameter to execute every time someone subscribing to the Observable. OnSubscription should call the next, Error, and complete methods of the observer object.
When someone subscribes to an Observable, it passes a set of values by calling the subscriber’s own methods.
The figure above is not directly related to the demo code.
const source = Rx.Observable.create(((observer: any) = > {
observer.next(1);
observer.next(2);
setTimeout(() = > {
observer.next(3);
}, 1000)}))/ / way
source.subscribe(
{
next(val) {
console.log('A:'+ val); }});2 / / way
source.subscribe((val) = > console.log('B:' + val));
/ / A: 1
/ / A: 2
/ / B: 1
/ / B: 2
After the / / - 1 s:
/ / A: 3
/ / B: 3
Copy the code
The printed result is needless to mention, first A and B will be printed, 1, 2, and 3 after 1 second.
Note that we can use either of these methods to call subscribe, either as an object with the next, error, and complete methods (all optional), or as a function. The parameters are next, error, and complete respectively.
empty
Definition:
public static empty(scheduler: Scheduler): Observable
As the name implies, this operator creates an operator that emits no data, but simply a completion notification.
Some readers here may ask what this thing is for.
In fact, it works well with some operators, such as mergeMap, which will be explained later. If you can’t wait, skip to mergeMap.
from
Definition:
public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>
Create an Observable from an array, an array-like object, a Promise, an iterator object, or an Observable like object.
This method is a bit like the Array.from method in JS (you can create a new Array from a class Array or an iterable), but in RxJS it turns into an Observable for the consumer to use.
const source = Rx.Observable.from([10.20.30]);
source.subscribe(v= > console.log(v));
/ / 10
/ / 20
/ / 30
Copy the code
From the point of view of the example code, this is actually quite simple to use, if you want to use RxJS to manage some data (such as arrays or arrays of classes) for an existing project, then the from operation is a good choice.
fromEvent
Definition:
public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>
Creates an Observable that emits an event of the specified type from the given event object. This can be used for Dom events in a browser environment or EventEmitter events in a Node environment.
Suppose we have a requirement that listens for button click events and prints them out:
const click = Rx.Observable.fromEvent(document.getElementById('btn'), 'click');
click.subscribe(x= > console.log(x));
Copy the code
This is a smoother way of writing than if we were using addEventListener.
fromPromise
Definition:
public static fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T>
It’s pretty obvious from the name, which is to turn a Promise into an Observable so that we don’t have to write chained calls like. Then,. Catch.
If the Promise is at three similarities, the output Observable issues that value and finishes. If the Promise is rejected, the output Observable issues an error.
const source = Rx.Observable.fromPromise(fetch('http://localhost:3000'));
source.subscribe(x= > console.log(x), e= > console.error(e));
Copy the code
Here to demonstrate the effect, the local up a service for testing, self-testing can use another.
So we can easily get the return value of the request.
interval
Definition:
public static interval(period: number, scheduler: Scheduler): Observable
An Observable created with this operator can emit consecutive numbers over a specified period of time, similar to the pattern we use with setInterval. We can use this operator when we need to retrieve a sequence of numbers, or when we need to do something on a regular basis.
const source = Rx.Observable.interval(1000);
source.subscribe(v= > console.log(v));
Copy the code
By default, it starts at 0. The time set here is 1 second, and it continues to emit data at specified intervals. In general, we can use the take operator to limit the amount of data emitted.
of
Definition:
public static of(values: ... T, scheduler: Scheduler): Observable<T>
It is not much different from from, but it is called by passing a parameter, similar to the CONCat method in JS. It also returns an Observable that, in turn, merges the parameters you pass in and emits the data synchronously.
const source = Rx.Observable.of(1.2.3);
source.subscribe(v= > console.log(v));
/ / 1
/ / 2
/ / 3
Copy the code
Print 1, 2, 3 in sequence.
repeat
Definition:
public repeat(count: number): Observable
Repeat the data source n times for the numeric type parameter you passed in.
const source = Rx.Observable.of(1.2.3).repeat(3);
source.subscribe(v= > console.log(v));
Copy the code
With the “of” operator, the result is to print 1, 2, 3, 1, 2, 3, 1, 2, 3, instead of printing 1, 2, 3 once.
range
Definition:
public static range(start: number, count: number, scheduler: Scheduler): Observable
Creates an Observable that emits a sequence of numbers in a specified range.
Those of you who have studied Python have a sense of deja vu.
const source = Rx.Observable.range(1.4);
source.subscribe(v= > console.log(v));
Copy the code
Print results: 1, 2, 3, 4.
Is it easy?
Conversion operator
So what is conversion operators, as is known to all, in the daily business, we always need to deal with all kinds of data, most of the time we are not to directly transfer to come over to use the data, but do some transformation of its own, let him become more fit our demand shape, that is the role of the conversion operators.
buffer
Definition:
public buffer(closingNotifier: Observable<any>): Observable<T[]>
Collect past values into an array and emit the array only when another Observable emits a notification. It’s like having a buffer, collecting data, waiting for a signal, and then releasing it.
;
The change operator is a bit like a big dam, sometimes we will choose to store water, wait until the specific time, then the leader ordered to open the dam, let the water out.
Here’s an example:
Suppose that we have such a demand, we have an interface is designed to retrieve specific data, but the interface one-time return only a data, it makes us very upset, because the product of data to a specific value to control the operation, also is his one click a button, to render the data comes out, that what should I do?
This is where our buffer operator comes in:
const btn = document.createElement('button');
btn.innerText = 'You point at me! '
document.body.appendChild(btn);
const click = Rx.Observable.fromEvent(btn, 'click');
const interval = Rx.Observable.interval(1000);
const source = interval.buffer(click);
source.subscribe(x= > console.log(x));
Copy the code
Here we directly use Interval to demonstrate that the interface gets data, and then implement the function with buffer.
Here we wait four seconds and then click the button. The printed value is [0, 1, 2, 3]. Then wait another eight seconds and click the button: [4, 5, 6, 7, 8, 9, 10, 11].
From the phenomenon, it is not difficult for us to see that we have realized the transmission of data through button control. At the same time, we can find another phenomenon, the sent data will be directly emptied in the buffer, and then collect new data.
It’s not hard to understand. Let’s take the example of a dam. We open the dam and release water for a period of time, and then we close it and continue to store water.
concatMap
Definition:
public concatMap(project: function(value: T, ? index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
This operator is interesting. Let’s look at the description on the website:
Project the source value into an Observable that is merged into the output Observable, waiting serially for the previous one to complete before merging the next Observable.
I don’t know if you readers feel “a little bit” is not easy to understand, but the author will be able to easily understand a small example:
Suppose you encounter such a scene, you and your girlfriend are shopping in the snack street, but her girlfriend has a bad habit, she always likes to buy a bite and then leave for you to eat, and then another shop to buy a little bite and then leave for you to eat, and you need time to eat every time. Usually, your boyfriend’s girlfriend will wait for you to eat before buying a house. In this case, you can still eat before taking a rest. On the other hand, regardless of whether your girlfriend finished eating or not, she continues to buy, and then more and more food in your hands, the speed of your eating completely can not catch up with the speed of your girlfriend to buy, then this time will lead to your heavier and heavier load, and finally can not bear the explosion of mentality.
The above scenario contains several core points of concatMap and points to note:
- The source value sends a data that you then pass inside
Observable
It starts to work or it sends data, and the subscriber receives the data, which is internalObservable
It is equivalent to always waiting for the source object to send a data before the next round of work is done, and the next round of work is completed. - If the current round of work is not completed and the data sent by the source object is received, it will be stored in a queue, and then once the current round is completed, the queue will be checked to see if there are any more. If there are, the next round will be started immediately.
- If internal
Observable
Is longer than the interval between data being sent by the source object, which can lead to larger cache queues and performance problems
Popular point to understand is that actually a factory assembly line, a responsible for hair material, another is responsible for making products, hair material is the source object, making products is the internal observables, this factory output can only be in the finished product is completed, so the production of products such as subscribers to do to get one.
If you send the material faster than the person making it can make a product, you can build up the material, and over time it will build up, and the factory won’t be able to hold it.
Understanding with code:
const source = Rx.Observable.interval(3000);
const result = source.concatMap(val= > Rx.Observable.interval(1000).take(2));
result.subscribe(x= > console.log(x));
Copy the code
First of all, we create a source object that sends data every three seconds. Then we call the instance method concatMap and pass it a function that returns an Observable. Finally, we get the Observable transformed by concatMap and subscribe to it.
The running result is: First, the source sends the first data in the third second of the program, and then at this point our internal Observable, which we passed in, starts to work, and after two seconds it sends two increasing numbers, and then the subscription function prints out the two numbers step by step, and after one second which is the sixth second of the program, the source sends the second number, Repeat the process at this point.
map
Definition:
public map(project: function(value: T, index: number): R, thisArg: any): Observable<R>
If you use the map method in js array more, perhaps here basically do not need to look at, the use of exactly the same.
You just pass in a function, and the first parameter of the function is every piece of data from the data source, and the second parameter is the index value of that data. You just return a value that is returned after a calculation or other operation as the value that the subscriber actually gets.
const source = Rx.Observable.interval(1000).take(3);
const result = source.map(x= > x * 2);
result.subscribe(x= > console.log(x));
Copy the code
The take operator is basically just a limit on how many numbers you can take and no longer send data.
This is used to demonstrate that the value of each data source is multiplied by 2 and sent to the subscriber, so the printed values are 0, 2, and 4.
mapTo
Definition:
public mapTo(value: any): Observable
Ignore the data sent by the data source and only send the specified value (pass parameter).
It’s like when a guy you hate asks you to pass the word, he says a lot of confession, and then asks you to pass it on to some girl, you don’t want to help him because you hate him, so you tell her I like you, and then you live happily together.
const source = Rx.Observable.interval(1000).take(3);
const result = source.mapTo(Awesome!);
result.subscribe(x= > console.log(x));
Copy the code
As in this code, the data source sends 0, 1, 2, while the subscriber actually receives three 666s.
mergeMap
Definition:
public mergeMap(project: function(value: T, ? index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
This definition looks a little scary, but let’s not panic, we just need to understand how it’s used in most cases.
Remember in the introduction to empty, I left a hole to show you how mergeMap and Empty work together? I’m just going to fill in this hole right here.
const source = Rx.Observable.interval(1000).take(3);
const result = source.mergeMap(x= > x % 2= = =0 ? Rx.Observable.of(x) : Rx.Observable.empty());
result.subscribe(x= > console.log(x));
Copy the code
The input source is a data source that sends the numbers 0, 1, and 2. We call the mergeMap operator and pass in a function that sends the subscriber if the current value sent by the input source is an even number, and not otherwise.
When you compare it to a map, you can see that the map return value must be an Observable, whereas mergeMap returns an Observable. We can return any transform or Observable with other capabilities.
pluck
Definition:
public pluck(properties: ... string): Observable
Used to select the specified property value on each data object.
If a data source sends an object that has a name attribute, and the subscriber points to the name attribute, then the operator can be used to extract the attribute value to the user.
const source = Rx.Observable.of({name: 'Joe'}, {name: 'bill'});
const result = source.pluck('name');
result.subscribe(x= > console.log(x));
/ / zhang SAN
/ / li si
Copy the code
Without a doubt, this operator is used to extract attributes, equivalent to using the Map operator to extract the name and return it to the subscriber.
scan
Definition:
public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>
The accumulator operator, which can be used for state management, is quite useful.
For usage, we can refer to the Reduce function of js arrays.
Suppose we now have a requirement that we want to add up the data sent from the data source and return it to the subscriber. What should we do?
const source = Rx.Observable.interval(1000).take(4);
const result = source.scan((acc, cur) = > acc + cur, 0);
result.subscribe(x= > console.log(x));
Copy the code
In code, the data source sends four values: 0, 1, 2, and 3, and each time the subscriber receives the value is the sum of the previous received number and the current number, respectively: 0, 1, 3, and 6.
Looking at usage, we pass the first argument to the SCAN operator a function that accepts two values: acc (the result or initial value of the previous sum), cur (the current value), and the second, the initial value of the calculation.
switchMap
Definition:
public switchMap(project: function(value: T, ? index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
This is the combination of the switch operator and the Map operator. The Switch operator is covered in the combination operators section.
The main function is to merge multiple Observables and have the ability to interrupt. This means that one Observable sends data first, and subscribers can receive the data normally. When another Observable starts sending data, the first Observable is interrupted and only the data sent by the next Observable is sent.
In layman’s terms, when someone is talking and you start talking loudly, they are interrupted and you are the only person who can hear you.
const btn = document.createElement('button');
btn.innerText = 'I want to speak! '
document.body.appendChild(btn);
const source = Rx.Observable.fromEvent(btn, 'click');
const result = source.switchMap(x= > Rx.Observable.interval(1000).take(3));
result.subscribe(x= > console.log(x));
Copy the code
Code to achieve the function is, when a classmate click the button, then start to send numbers from 0, at this time if a student has not sent data, students two again click, then the data of a student will not be sent again, started to send students two.
Suppose that student 2 clicks the button after 1 o ‘clock, then the result will be printed: 0, 1, 0, 1, 2. Here, the data sent by student 2 starts from the second 0.
Other conversion operators
Portal: Conversion operator
bufferCount
bufferTime
bufferToggle
bufferWhen
concatMapTo
exhaustMap
expand
groupBy
mergeMapTo
mergeScan
pairwise
partition
switchMapTo
window
windowCount
windowTime
windowToggle
windowWhen
Filter operator
debounceTime
Definition:
public debounceTime(dueTime: number, scheduler: Scheduler): Observable
For those of you who have some experience in JS development, you should know the debounce function, so this time you will ask, it is not similar to Debounce, right? Yes, it does the same thing as the Debounce function, but with a slight difference.
A value is emitted from a source Observable only after a specified period of time has passed and no other source value has been emitted.
In other words, if a data source sends a number every second, and we use the debounceTime operator and set the delay time, then after the data source sends a new data, if the data source sends another data during the delay time, the new data will be cached and not sent. Wait after sending the data and wait for the end of the delay time will be sent to the subscriber, not only that, the delay time before the time and have a value in the buffer, this time and receive a new value, the buffer will throw away the old data, into the new, and then wait for the delay time and then send it again.
const source = Rx.Observable.interval(1000).take(3);
const result = source.debounceTime(2000);
result.subscribe(x= > console.log(x));
Copy the code
From the point of view of the code, let’s guess, what is the final print?
First, we created a data source that sends a number every second and only three times. Then we used debounceTime and set the delay time to 2 seconds. At this point, we looked at the data that was printed, and we saw that no data was printed for the first 3 seconds after the program started. Then there was no more printing. Why?
Answer is that the data source will in turn send three per second 0, 1, 2, because we set the delay time to 2 seconds, then that is to say, before the completion of the data we are impossible to see the data, because the sending frequency source for a second, there are two seconds delay time, send finished, that is, unless, otherwise can’t meet the new data source for two seconds to send, It takes about two seconds for the data to be printed, so no matter how many numbers we send from the data source, the subscriber will only receive the last number.
throttleTime
Definition:
public throttleTime(duration: number, scheduler: Scheduler): Observable<T>
How can the introduction of tremble forget its old buddy throttling?
The main capability of the operator is the same as that of the throttling function, that is, it will only send one data at a certain time, and the redundant data will be discarded. The only difference with the shake stabilization operator is that it does not block for the first value.
const source = Rx.Observable.interval(1000).take(6);
const result = source.throttleTime(2000);
result.subscribe(x= > console.log(x));
/ / 0
/ / 3
Copy the code
Print the results shown above, in fact, the effect also is very easy to explain, the code creates a data source sends a increase starting from 0 degrees per second, sent a total of 6 0 to 5, and use throttleTime set two seconds, the subscriber will not be blocked when receiving the first value, but of the two seconds after receiving a all take less than the value, That’s the fourth second to get the 3.
distinct
Definition:
public distinct(keySelector: function, flushes: Observable): Observable
This operator is also very easy to understand. It can be summarized in one sentence: when the operator is used, the data received by the subscriber will not be duplicate, that is, it is used to filter duplicate data.
const source = Rx.Observable.from([1.2.3.2.4.3]);
const result = source.distinct();
result.subscribe(x= > console.log(x));
Copy the code
The final result of the program is: 1, 2, 3, 4, and the number of repeats is filtered directly.
filter
Definition:
public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable
This kind of basic should not have what good introduction, and we understand the array filter method is not different, but the place is not consistent.
const source = Rx.Observable.from([1.2.3.2.4.3]);
const result = source.filter(x= >x ! = =3);
result.subscribe(x= > console.log(x));
Copy the code
The result is that all values except 3 are printed.
first
Definition:
public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R>
Emit only the first value (or the first value that satisfies the condition) emitted by the source Observable.
This is also similar to the above, the basic introduction can understand, here no longer redundant.
take
Definition:
public take(count: number): Observable<T>
Emit only the N values (N = count) originally emitted by the source Observable.
This operator has been used many times before, and it’s a very common one, to control how many values to get, and in conjunction with something like interval, which sends data continuously, you can control how many values to get.
skip
Definition:
public skip(count: Number): Observable
Returns an Observable that skips the first N values (N = count) emitted by the source Observable.
For example, if the data source sends six values, you can use the SKIP operator to skip the first number.
const source = Rx.Observable.from([1.2.3.2.4.3]);
const result = source.skip(2);
result.subscribe(x= > console.log(x));
Copy the code
The printed result is 3, 2, 4, 3, skipping the first two numbers.
Other filter operators
The official operator is still a lot of, here is not an introduction, interested in can go to the official website to see: filter operator.
debounce
distinctKey
distinctUntilChanged
distinctUntilKeyChanged
elementAt
ignoreElements
audit
auditTime
last
sample
sampleTime
single
skipLast
skipUntil
skipWhile
takeLast
takeUntil
takeWhile
throttle
Combination operator
concatAll
Definition:
public concatAll(): Observable
As the name suggests, this operator is a bit like concat, which is used to combine multiple Observable’s into one, except that it is serial. That is, it combines two Observable’s, so the subscriber gets the first Observable before fetching the value. It then begins to receive the value of the latter Observable.
const source1 = Rx.Observable.of(1.2);
const source2 = source1.map(x= > Rx.Observable.interval(1000).take(3));
const result = source2.concatAll();
result.subscribe(x= > console.log(x));
Copy the code
In order for the subscriber to receive more than one Observable, the map operator returns a new Observable, and the map operator returns a new Observable. ConcatAll is used to merge the results, and the final subscriber receives the results in order: 0, 1, 2, 0, 1, 2.
mergeAll
Definition:
public mergeAll(concurrent: number): Observable
It’s almost the same as concatAll, except that it’s parallel, meaning that multiple merged Observables send data in no particular order.
combineLatest
Definition:
public combineLatest(other: ObservableInput, project: function): Observable
Combine multiple Observables to create an Observable whose value is calculated based on the latest value of each input Observable.
This operation symbol is not easy to understand from the introduction, let’s combine the example to explain.
const s1 = Rx.Observable.interval(2000).take(3);
const s2 = Rx.Observable.interval(1000).take(5);
const result = s1.combineLatest(s2, (a, b) = > a + b);
result.subscribe(x= > console.log(x));
Copy the code
The printed results are 0, 1, 2, 3, 4, 5, 6.
First of all, let’s look at how this combineLatest is used. It’s an instance operator, and what we’re showing here is that we’re combining S1 and S2 together, and the second argument needs to be passed in a callback to process the value of the combination. Since we’re only combining two arguments here, we only accept a and b. The return value of this callback function is the value obtained by the subscriber.
In fact, we can’t tell anything from the results. The main reason is that the process is as follows:
s2
Send a 0, and at this points1
If no value is sent, the callback we pass in will not be executed and the subscriber will not receive the value.s1
Send a 0, ands2
The last sent value is 0, so the callback function is called and these two arguments are passed in, and the subscriber receives thems2
Send a 1, ands1
The last time it was sent was 0, so the result is 1.s1
Send a 1, ands2
The last sent value was 1, so the result is 2.s2
Send a value of 2, ands1
The last value sent was 1, so the result is 3.s2
Send a value of 3, ands1
The last time the value was sent was 1, so the result is 4.- . Repeat the preceding steps.
One thing to note here is that at some point s1 and S2 will send data together, but this will also be sequential, so it’s up to the one who defines it first to send it first, and you can see that from the steps above.
In fact, there is a dependency between the combined sources, that is, the subscriber will not receive the message until both sources have sent at least one value, and the end signal will not be sent until both sources have sent.
zip
Definition:
public static zip(observables: *): Observable<R>
Combine multiple Observables to create an Observable whose value is calculated from the values of all the input Observables in order. If the last argument is a function, this function is used to calculate the final emitted value. Otherwise, an array containing all input values in order is returned.
In layman’s terms, multiple sources are aligned in order, slightly different from the combineLatest above.
No more words, on the code:
const s1 = Rx.Observable.interval(1000).take(3);
const s2 = Rx.Observable.interval(2000).take(5);
const result = s1.zip(s2, (a, b) = > a + b);
result.subscribe(x= > console.log(x));
Copy the code
The output is 0, 2, and 4.
How do we understand this? First of all, remember that the numbers that are being counted between multiple sources are sequentially aligned, that is, the first number of S1 is aligned with the first number of S2, and that one-to-one calculation, so the end result that the subscriber receives is that the aligned numbers are passed in and we call the last callback function of zip, This is the result that is returned to the user after the value is computed, which is optional.
When either source ends, the whole is signaled to end because there are no more numbers to align.
startWidth
Definition:
public startWith(values: ... T, scheduler: Scheduler): Observable
The returned Observable emits the item specified as a parameter and then the item emitted by the source Observable.
How to understand, in fact, a good example, for example, there is a string of tanghulu, the whole is a color, you think not beautiful, so you inserted in the front of the string of tanghulu several different colors of the tanghulu, this time the user will eat when you inserted in the front of the tanghulu.
const source = Rx.Observable.interval(1000).take(3);
const result = source.startWith(Awesome!)
result.subscribe(x= > console.log(x));
Copy the code
The output is 666, 0, 1, and 2.
That makes sense.
switch
Definition:
public switch(): Observable<T>
Convert a higher-order Observable into a first-order Observable by subscribing only to the newly issued internal Observable.
The use of this operator was mentioned earlier when we introduced the switchMap conversion operator, which is equivalent to map+switch=switchMap.
Here’s an example:
const btn = document.createElement('button');
btn.innerText = 'I want to speak! '
document.body.appendChild(btn);
const source = Rx.Observable.fromEvent(btn, 'click');
const source2 = source.map(x= > Rx.Observable.interval(1000).take(3));
const result = source2.switch();
result.subscribe(x= > console.log(x));
Copy the code
This code implements the same effect as switchMap. When the user clicks the button, the data will be sent. When the data transmission is not complete, the user clicks the button again, and a new data transmission process will be started, and the original data transmission process will be discarded.
Other combination operators
Portal: combinatorial operator
combineAll
concat
exhaust
forkJoin
merge
race
withLatestFrom
zipAll
The multicast operator
Portal: multicast operator
cache
multicast
publish
publishBehavior
publishLast
publishReplay
share
To be perfect…
Error handling operator
Portal: error handling operator
catch
retry
retryWhen
To be perfect…
Tool operator
Portal: Tool operator
do
delay
delayWhen
dematerialize
finally
let
materialize
observeOn
subscribeOn
timeInterval
timestamp
timeout
timeoutWith
toArray
toPromise
To be perfect…
Conditions and Boolean operators
Portal: Conditions and Boolean operators
defaultIfEmpty
every
find
findIndex
isEmpty
To be perfect…
Mathematical and aggregate operators
Portal: Mathematical and aggregation operators
count
max
min
reduce
To be perfect…
conclusion
In general, RxJS as a data flow form to deal with the complex data in our daily business is very maintenance friendly, and in many complex data operations, RxJS can bring us a lot of efficiency operators, but also give us a novel idea of data operation.
We can think of RxJS as a lodash library that can emit events, encapsulating a lot of complex operation logic, allowing us to use the process of data conversion and manipulation in a more elegant way.
Refer to the article
The official documentation
A Guide to Responsive Programming – easy to understand RxJS
Fisherman and Rxjs story, this will teach you the front end of Rxjs must be
Use RxJS to build responsive applications
Thoroughly understand Observable, Observer, and Subject in RxJS