preface

Curious about the RxJS technology, THE author spent a few days researching this technology, and liver a bag of wolfberry to complete this article, it is not easy. But also it is through this period of learning, I found that the technology to a certain extent, can solve some pain points, and I met in daily business, and have a kind of want to immediately to your new project of desire, and indeed this to control by the concept of data streams of data in a large project brings a kind of very elegant programming experience.

If read feel have harvest, hope to give the author a praise 😘

concept

RxJS is the abbreviation of Reactive Extensions for JavaScript. Derived from Reactive Extensions, RxJS is an asynchronous programming application library based on observable data streams combined with observer and iterator patterns. RxJS is an implementation of Reactive Extensions in JavaScript.

Attention! It has nothing to do with React, and I initially thought it was an acronym for React.js.

When it comes to unfamiliar technology, the usual way to think about it is to go to Google, search, and then look at official documents or scattered blogs to find information that makes sense of the technology. But in many cases, it is difficult to really understand the context of a technology from just a few words.

This article will analyze the value of this technology from a learning perspective and the benefits it can bring to our existing projects.

background

From a developer’s point of view, for any technology, we often talk about the following:

  • Application Scenario?
  • How to land?
  • How easy is it to get started?
  • Why is it needed? What problem does it solve?

In view of the above problems, we can analyze the related concepts of RxJS from the shallow to the deep.

Application Scenario?

Suppose we have a requirement like this:

After we upload a large file, we need to monitor its progress in real time, and stop listening when the progress reaches 100.

For general practices we can adopt the way of short polling, in the packaging to the asynchronous request, if we adopt the way of Promise, so we can use common practice to write one for polling method, to obtain the return value, if the schedule does not complete a certain time delay call this method again, Errors also need to be caught and handled when they occur.

Obviously, this approach is to a certain extent, cause the development and maintenance costs for developers, because this process is more like we are observing an event, the event will be triggered many times and let I perceive, moreover also have the ability to unsubscribe, Promise way actually not friendly when dealing with this kind of thing, RxJS’s management of asynchronous data flows is more consistent with this paradigm.

To quote especially large words:

My personal preference is to use Rx where Rx is appropriate, but not for everything. A good example would be a multi-server real-time message flow, which is processed at a high level through Rx, and then the View layer is a clear Observable, but the View layer itself can still handle user events using the existing paradigm.

How to land?

For the existing project, how to combine with the actual and ensure the stability of the original project is indeed our priority. After all, if any technology cannot be put into practice, it will inevitably bring us limited benefits.

If you’re an Angular developer, you probably know that Angular is deeply integrated with Rxjs, and that whenever you use the Angular framework, you will inevitably be exposed to Rxjs.

In some cases where more precise control of events is required, for example we want to listen for a click event but stop listening after three clicks.

It is very convenient and effective to introduce RxJS for functional development at this time, so that we can save on listening to events and recording the state of clicks, as well as dealing with some of the psychological burden of canceling the listening logic.

You can also choose to introduce RxJS to your larger projects for unified management of data flows, but don’t impose it on scenarios that don’t fit the RxJS concept, where the actual effect may not be obvious.

How easy is it to get started?

If you are an experienced JavaScript developer, you may be able to apply RxJS to some simple practices in a few minutes.

Why is it needed? What problem does it solve?

If you’re a developer using JavaScript, is it easy to write code that is very inefficient, or bloated with judgment and a lot of dirty logic when dealing with lots of events and complex data parsing and conversion?

In addition, in the JavaScript world, the word “trouble” seems to be often mentioned in the context of handling asynchronous events. We can first savor the value of RxJS from the history of handling asynchronous events.

The callback era

Usage scenario:

  • Event callback
  • Ajaxrequest
  • Node API
  • setTimeout,setIntervalAnd async event callback

In the above scenario, we started by passing in a callback function at the time of the function call, which was executed after the synchronous or asynchronous event had completed. We can say that in most simple scenarios, using the callback method is undoubtedly very convenient, such as we are familiar with several higher-order functions:

  • forEach
  • map
  • filter
[1.2.3].forEach(function (item, index) {
    console.log(item, index);
})
Copy the code

The way they are used only requires us to pass in a callback function to complete batch processing of a set of data, very convenient and clear.

However, in some complex business processing, if we still adhere to the idea of not abandoning the stubborn way of using callback functions, the following situation may occur:

fs.readFile('a.txt'.'utf-8'.function(err, data) {
    fs.readFile('b.txt'.'utf-8'.function(err, data1) {
        fs.readFile('c.txt'.'utf-8'.function(err, data2) {
            / /...})})})Copy the code

Of course, as a writer, you may think this is very clear, there is nothing wrong with it. But what if it’s a little bit more complicated, what if all the functions that are called are different, what if the contents of each callback are very complex. In the short term, you may know why you wrote it and why you wrote it, but after a month, three months, a year, are you sure you’ll be able to find your original heart in a lot of business code?

You can’t wait to find submit records, this is which han Batch write, with shit…… Oh, my God, I wrote that.

At this time, in the face of a lot of developers suffering back to the region, finally someone out to benefit mankind……

Promise time

Promises were first proposed by the community (we’re not going to be able to resist callbacks for those of us who deal with weird business code every day), but they were officially added to the language standard in ES6, and the uniform specification allowed us to make new promises natively.

On the plus side, Promises offer a different coding approach than callbacks, with chained calls that throw data back one layer at a time, and the ability to consistently catch exceptions, rather than simply using callbacks and having to try and catch through multiple codes.

No more words, look at the size!


function readData(filePath) {
    return new Promise((resolve, reject) = > {
        fs.readFile(filePath, 'utf-8'.(err, data) = > {
            if(err) reject(err); resolve(data); })}); } readData('a.txt').then(res= > {
    return readData('b.txt');
}).then(res= > {
    return readData('c.txt');
}).then(res= > {
    return readData('d.txt');
}).catch(err= > {
    console.log(err);
})

Copy the code

By contrast, this writing method will not be more in line with our normal thinking logic, this order, people look very comfortable, but also more conducive to the maintenance of the code.

Advantages:

  • If you change your state, you’re not going to change it again, you’re going to get the same result all the time
  • The asynchronous event processing process, writing more convenient

Disadvantages:

  • Can’t cancel
  • Error cannot betry catch(But it can be used.catchWay)
  • When inpendingYou can’t tell what stage you’re in

Although the emergence of Promise improves the efficiency of handling asynchronous events to some extent, we often have to face some unfriendly code migration when we need to mix with some synchronous events. We need to move the code from the outer layer to the inside of the Promise to ensure that an asynchronous event completes before continuing execution.

The Generator function

The introduction of Generator functions in ES6 provides a solution for asynchronous programming by allowing the yield keyword to suspend the execution flow of functions. Formally, it is also an ordinary function, but has several notable features:

  • functionThere is an asterisk “*” between the keyword and the function name (next to each other is recommendedfunctionKeywords)
  • Function body useYield · expressions that define different internal states (there can be more than one)Yield `)
  • Direct callGeneratorThe function does not execute and does not return the result. Instead, it returns a traverser object (Iterator Object)
  • Which, in turn, calls the traverser objectsnextMethod, traversalGeneratorEvery state inside the function
function* read(){
    let a= yield '666';
    console.log(a);
    let b = yield 'ass';
    console.log(b);
    return 2
}
let it = read();
console.log(it.next()); // { value:'666',done:false }
console.log(it.next()); // { value:'ass',done:false }
console.log(it.next()); // { value:2,done:true }
console.log(it.next()); // { value: undefined, done: true }
Copy the code

In this mode, we can freely control the execution mechanism of functions and let the functions be executed when needed. However, for daily projects, this writing method is not friendly enough to give users the most intuitive feelings.

async / await

I believe that after a lot of interview questions, you more or less should also know that this thing is actually a syntax sugar, which is the combination of Generator function and automatic actuator CO, so that we can write asynchronous code in a synchronous way, very carefree.

One way or another, it works so well that if it weren’t for compatibility, you’d want to use it on a large scale.

Let’s take a look at how happy the code is:


async readFileData() {
    const data = await Promise.all([
        'Asynchronous Event 1'.'Asynchronous Event 2'.'Asynchronous event three'
    ]);
    console.log(data);
}

Copy the code

Just write it as if it were synchronous, and don’t worry about copying and pasting a bunch of code inside an other asynchronous function.

RxJS

It is similar to Promise in the way of use, but it is much more powerful than Promise in the ability. It can not only control data in the form of stream, but also has many built-in tools and methods so that we can easily deal with various data level operations, making our code as smooth as silk.

Advantage:

  • Significant reduction in code volume
  • Improved code readability
  • Good for handling asynchrony
  • Event management, scheduling engine
  • Very rich operator
  • Declarative programming style
function readData(filePath) {
    return new Observable((observer) = > {
        fs.readFile(filePath, 'utf-8'.(err, data) = > {
            if(err) observer.error(err); observer.next(data); })}); } Rx.Observable .forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt'))
.subscribe(data= > console.log(data));

Copy the code

This is just the tip of the iceberg of the energy that RxJS can express. There are many ways to approach this scene. RxJS is good at handling asynchronous data streams and is rich in library functions. For RxJS, it can convert arbitrary Dom events, or promises, into Observables.

Advance knowledge

Before entering the world of RxJS, we first need to clarify and understand several concepts:

  • Responsive programming (Reactive Programming)
  • Flow (Stream)
  • Observer mode
  • Iterator pattern

Reactive Programming

Reactive Programming, which is an event-based model. In the asynchronous programming pattern above, we described two ways to get the results of the previous task execution. One is Proactive rotation, which we call Proactive. The other one is Reactive. We call it Reactive. In Reactive mode, the feedback of the result of the previous task is an event whose arrival will trigger the execution of the next task.

The idea behind reactive programming is this: you can create a Data stream (also known as a “stream”, more on this in a later section) from anything including Click and Hover events. Streams are cheap and common. Anything can be a Stream: variables, user input, properties, Cache, data structures, and so on. For example, imagine that your Twitter feed is like a Data Stream like Click Events. You can listen to it and respond accordingly.

Combined with the actual, if you use a Vue, inevitably can immediately think of, the design concept of Vue Fan Shime, not also is a kind of reactive programming in the process of writing code, we only need to pay attention to the change of the data, don’t need to manual operation to view change, the Dom layer will change with the change of the relevant data and automatically change and to render.

Flow (Stream)

Flow as a concept should be language independent. The concept of a stream can be seen in file I/O streams, standard INPUT/output streams for Unix systems, standard error streams (STdin, STdout, stderr), as well as TCP streams mentioned earlier, as well as in the abstraction of HTTP request/response flows by some Web backend technologies such as Nodejs.

At the heart of reactive programming, the essence of a flow is a chronologically sequential set of ongoing events.

For first-class or multiple streams, we can transform them, merge them, etc., to create a new stream. During this process, the stream is immutable, that is, only a new stream is returned.

Observer mode

Among many design patterns, the observer pattern can be said to have obvious effects in many scenarios.

The Observer pattern is a behavioral design pattern that allows you to define a subscription mechanism to notify multiple other objects that “observe” an object when an event occurs on that object.

With the actual examples to understand, just like you booked a bank card balance SMS notification service change, so this time, every time you transfer or buy goods after using this piece of bank card consumption, the banking system will give you push a text message, inform you of how much consumption how many money, this is actually a kind of observer pattern.

In this process, the bank card balance is the object being observed, and the user is the observer.

Advantages:

  • The coupling relationship between the object and the observer is abstract.
  • Conforms to the principle of dependence inversion.
  • A triggering mechanism is established between the target and the observer.
  • Support for broadcast communications

Inadequate:

  • The dependency between the target and the observer is not completely broken, and circular references are possible.
  • When there are many observer objects, the release of notifications takes a lot of time and affects the efficiency of the program.

Iterator pattern

The Iterator pattern is also called the Sursor pattern. In object-oriented programming, the Iterator pattern is a design pattern. It is one of the simplest and most common design patterns. The iterator pattern separates the process of iteration from the business logic by allowing users to traverse each element of a container through a specific interface without knowing the underlying implementation.

const iterable = [1.2.3];
const iterator = iterable[Symbol.iterator]();
iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}
iterator.next(); // => { value: "3", done: false}
iterator.next(); // => { value: undefined, done: true}
Copy the code

As front-end developers, the most common data structures we encounter with an iterator interface are: Map, Set, Array, array-like, etc. When we use them, we can all use the same interface to access each element, which is the iterator pattern.

The Iterator function:

  • Provide a unified and simple access interface for various data structures;
  • Enable the members of a data structure to be arranged in a certain order;
  • For new traversal syntaxfor... ofImplement loop traversal

In many articles, one would like to mix iterator and eraser for conceptual resolution, when the meaning is the same, or so to speak (iterator equals eraser).

Observable

Represents a concept that is a callable collection of future values or events. It can be subscribed to by multiple observers, each of which is independent of the other.

Here’s an example:

Suppose you subscribe to a blog or a service number that pushes articles (wechat official account, etc.), and then whenever the official account updates new content, the official account will push new articles to you. In this relationship, the official account is an Observable, which is used to generate data data source.

Now that you have a good idea of what an Observable is, let’s look at how to create an Observable in RxJS.

const Rx = require('rxjs/Rx')

const myObservable = Rx.Observable.create(observer= > {
  observer.next('foo');
  setTimeout(() = > observer.next('bar'), 1000);
});
Copy the code

We can create an Observable by calling the Observable.create method. This method takes a function, called the producer function, that generates the Observable value. This function takes an observer, and calls observer.next() inside the function generate an Observable with a series of values.

Regardless of what an observer is, creating an Observable is simply a matter of calling an API, and a simple Observable is created.

Observer

A collection of callback functions that know how to listen for the value provided by an Observable. An Observer plays the role of a sentinel in a signal flow. It is responsible for observing the state of task execution and transmitting signals into the flow.

Here we simply implement the internal structure:

const observer = {
	next: function(value) {
		console.log(value);
	},
	error: function(error) {
		console.log(error)
	},
	complete: function() {
		console.log('complete')}}Copy the code

In RxJS, the Observer is optional. In the absence of parts of the next, Error, and complete processing logic, the Observable still works, and the processing logic for the particular notification type contained is automatically ignored.

For example, we can define it like this:

const observer = {
	next: function(value) {
		console.log(value);
	},
	error: function(error) {
		console.log(error)
	}
}
Copy the code

It still works.

So how does it fit in with what we use in actual combat:

const myObservable = Rx.Observable.create((observer) = > {
    observer.next('111')
    setTimeout(() = > {
        observer.next('777')},3000)
})

myObservable.subscribe((text) = > console.log(text));
Copy the code

Here we use the subscribe method to make an Observer subscribe to an Observable. We can take a look at the subscribe function definition to see how it works:

subscribe(next? :(value: T) = > void, error? :(error: any) = > void, complete? :() = > void): Subscription;
Copy the code

Source code is written in TS, the code is the document, very clear, here the author to you to interpret, we from the view of the input, from left to right is next, error, complete, and is optional, we can choose to pass in the relevant callback, This is where we see that next, Error, and complete can still work without part of the logic, because they are all optional.

The Subscription and the Subject

Subscription

Subscription is an Observable execution that can be cleaned up. The most common method of this object is the unsubscribe method, which does not take any arguments and is used to clear the resources occupied by the Subscription. It also has an add method that allows us to unsubscribe from multiple subscriptions.

const myObservable = Rx.Observable.create(observer= > {
  observer.next('foo');
  setTimeout(() = > observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x= > console.log(x));
/ / later:
// This will cancel the Observable execution in progress
Observable execution is initiated by calling the SUBSCRIBE method using the observer
subscription.unsubscribe();
Copy the code

Subject = Subject

It is a proxy object that is both an Observable and an Observer. It can receive data from both an Observable and an Observer that subscribes to it. At the same time, Subject will multicast the internal Observers list.

Subjects are the only way to share any Observable execution with multiple observers

At this point eagle-eyed readers will notice that a new concept has come into being — multicasting.

  • What about multicast?
  • Is there unicast with multicasting?
  • What’s the difference?

Let me give you a good analysis of these two concepts.

unicast

Ordinary Observables are unicast, so what is unicast?

Unicast means that each ordinary Observables instance can only be subscribed to by one observer, and when it is subscribed to by another observer, a new instance is created. In other words, when an ordinary Observables is subscribed to by different observers, there will be multiple instances. No matter when the observer started subscribing, each instance sends the value to the corresponding observer from the beginning.

const Rx = require('rxjs/Rx')

const source = Rx.Observable.interval(1000).take(3);

source.subscribe((value) = > console.log('A ' + value))

setTimeout(() = > {
    source.subscribe((value) = > console.log('B ' + value))
}, 1000)

// A 0
// A 1
// B 0
// A 2
// B 1
// B 2
Copy the code

The source is an Observable that sends an integer increment from 0 every second and only three times. (The take operator is an Observable that stops sending data when it takes an integer.) .

From this we can see that two different observers subscribe to the same source, one directly subscribing, and the other subscribing after a delay of one second.

From the print result, A prints an increasing number every second from 0, while B delays by one second and then prints again from 0. Thus, the execution of A and B is completely separate, that is, each subscription creates A new instance.

In many scenarios, we might want B to be able to accept not the data initially, but the data that is currently being sent at the moment of subscription, where multicast capability is needed.

multicast

What about the ability to multicast, where we only receive real-time data whenever we subscribe.

Maybe at this time, A partner will jump out, directly to A middleman to subscribe to the source, and then forward the data to A and B?


const source = Rx.Observable.interval(1000).take(3);

const subject = {
	observers: [].subscribe(target) {
		this.observers.push(target);
	},
	next: function(value) {
		this.observers.forEach((next) = > next(value))
	}
}

source.subscribe(subject);

subject.subscribe((value) = > console.log('A ' + value))

setTimeout(() = > {
	subject.subscribe((value) = > console.log('B ' + value))
}, 1000)

// A 0
// A 1
// B 1
// A 2
// B 2

Copy the code

The only change is that the object that A and B subscribe to is changed from source to subject. Then look at what the subject contains. This simplifies things A bit by removing the error and complete functions and keeping only next. It then contains a built-in Observer array, which includes all subscribers, and exposes a subscribe for observers to subscribe to.

In use, having the middleman subject subscribe to the Source ensures unified management and real-time data, since there is essentially only one subscriber to the source.

The middle man can be directly replaced by the RxJS Subject class instance. The effect is the same

const source = Rx.Observable.interval(1000).take(3);

const subject = new Rx.Subject();

source.subscribe(subject);

subject.subscribe((value) = > console.log('A ' + value))

setTimeout(() = > {
	subject.subscribe((value) = > console.log('B ' + value))
}, 1000)

Copy the code

Similarly, let’s first see whether the printed result meets the expectation. First, the printed result of A does not change, and the number printed for the first time of B now starts from 1, which is the data currently being transmitted. This satisfies our need to obtain real-time data.

Unlike unicast subscribers who always need to get data from scratch, multicast mode ensures real-time data.

In addition to the above, RxJS provides three variants of Subject:

  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

BehaviorSubject

The BehaviorSubject is a Subject that emits an extra value at the last emitted value when a new subscription is available.

]

Also we combine reality to understand, suppose you have we need to use it to maintain a state, after it changes to all subscription can send a current state of the data, it’s like we’re going to implement a computational attributes, we are only concerned with the calculation of properties of the final state, and don’t care about the process of the number of change, and what to do then?

We know that the normal Subject will only send the current data when there is new data, but will not send the sent data after the sending, so at this point we can introduce the BehaviorSubject for terminal maintenance. Because the observer who subscribes to the object receives the last value sent by the object while subscribing, this satisfies our requirement above.

We then analyze this Subject application scenario in combination with the code:

const subject = new Rx.Subject();

subject.subscribe((value) = > console.log('A:' + value))

subject.next(1);
/ / A: 1
subject.next(2);
/ / A: 2

setTimeout(() = > {
	subject.subscribe((value) = > console.log('B:' + value)); // Failed to receive the value after 1s
}, 1000)
Copy the code

First demo is using common Subject to as subscription object, then the observer is A Subject in the instance objects call next subscription before send A new value, then the observer is A second delay after subscription, so A normal receiving data, so this time due to B at the time of sending the data haven’t subscribe, so it didn’t receive the data.

So let’s look at the BehaviorSubject implementation again:

const subject = new Rx.BehaviorSubject(0); // You need to pass in the initial value

subject.subscribe((value: number) = > console.log('A:' + value))
/ / A: 0
subject.next(1);
/ / A: 1
subject.next(2);
/ / A: 2

setTimeout(() = > {
	subject.subscribe((value: number) = > console.log('B:' + value))
	/ / B: 2
}, 1000)
Copy the code

Also from the printed results, the difference between the source object and the ordinary Subject is that the source object sends the most recently changed value (or the initial value if there is no change) while subscribing. At this time, our B also gets the latest state as expected.

Here we need to pass in an initial value when instantiating the BehaviorSubject.

ReplaySubject

Once you understand the BehaviorSubject, it’s easy to understand the ReplaySubject. ReplaySubject saves all the values and then plays them back to the new subscriber, and it provides input parameters to control the number of replay values (default replay is all).

]

What? Don’t get it? See the code:

const subject = new Rx.ReplaySubject(2);

subject.next(0);
subject.next(1);
subject.next(2);

subject.subscribe((value: number) = > console.log('A:' + value))
/ / A: 1
/ / A: 2

subject.next(3);
/ / A: 3
subject.next(4);
/ / A: 4

setTimeout(() = > {
	subject.subscribe((value: number) = > console.log('B:' + value))
	/ / B: 3
	/ / B: 4
}, 1000)

// The overall print order:
/ / A: 1
/ / A: 2
/ / A: 3
/ / A: 4
/ / B: 3
/ / B: 4
Copy the code

From the constructor argument, the BehaviorSubject and ReplaySubject both need to pass a parameter, which for the BehaviorSubject is the initial value, and for the ReplaySubject is the number of previous replays. If we don’t pass the number of replays, Then it will replay all the values that have been emitted.

As a result, if you do not pass in a definite number of replays, the effect is almost the same as the unicast effect described earlier.

So we can reanalyze the code to see how many times observers receive the value before the source object is sent at the moment of subscription.

AsyncSubject

The AsyncSubject only sends the last value of the Observable execution to the Observer when it completes (executing complete()). If the AsyncSubject terminates due to an exception, it does not release any data, but it does pass an exception notification to the Observer.

]

AsyncSubject is generally used less, but more often than not, the first three are used.

const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res= > {
	console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res= > {
	console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res= > {
	console.log('C:' + res);
});
subject.complete();
subject.next(4);

// Print the whole result:
// A:3
// B:3
// C:3
Copy the code

The source object will return the last data to all observers only after all data has been sent, i.e. after calling the complete method.

This is like the novel often some, when you want to put skills, first to play a set of hand style, after the game will release your big move.

Cold – Observables and Hot – Observables

Cold Observables

Cold Observables start producing values only when they are obsered by Observers. It is unicast and generates as many subscription instances as there are subscriptions. Each subscription receives values starting with the first generated value, so each subscription receives the same value.

If you want to refer to the code related to the Cold Observables, just look at the previous unicast example.

As with the ability of unicast description, the source object sends all the numbers from the initial value to the observer whenever the observer starts to subscribe.

Hot Observables

Hot Observables generates a value whether or not it is subscribed. Is multicast, in which multiple subscriptions share the same instance and receive values at the beginning of the subscription. Each subscription receives different values depending on when the subscription was started.

Here are several scenarios that we can analyze one by one to help understand:

“Heat”

You can first ignore unfamiliar functions in your code, which will be explained later.

const source = Rx.Observable.of(1.2).publish();
source.connect();
source.subscribe((value) = > console.log('A:' + value));
setTimeout(() = > {
	source.subscribe((value) = > console.log('B:' + value));
}, 1000);
Copy the code

An Observable is created using the Rx operator of, followed by a publish function. Connect is called to start sending data.

Final code execution result is without any print data, analyze the reasons and are better understood, because haven’t subscribe to the open data, and it is A Hot Observables, it won’t notice if you have any subscription, open after can send data directly, so A and B are not to receive data.

Of course, if you put the connect method last, the final result is that A receives it, but B still can’t, because A subscribed before opening the data, and B has to wait one second.

A more intuitive scenario

As described above in multicast, in fact, what we want to see more is that both observers A and B can receive the data, and then observe the difference of the data, so that it is easy to understand.

Here we directly change to another source:

const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) = > console.log('A:' + value));
setTimeout(() = > {
	source.subscribe((value: number) = > console.log('B:' + value));
}, 3000);
source.connect();

/ / A: 0
/ / A: 1
/ / A: 2
/ / B: 2
Copy the code

In this case, we use interval together with the take operator to emit an increasing number every second, up to three, and then the print result is clearer. A normally receives three numbers, and B receives only the last number, 2, after three seconds. This method is the same as described above in multicast.

Both comparisons

  • Cold ObservablesFor example, if we go to website B to watch an episode with new updates, whenever we go to watch it, we can watch the whole episode from the beginning, regardless of whether other people watch it or not.
  • Hot Observables: This is just like when we go to B station to watch a live broadcast, it starts to play directly after the live broadcast, regardless of whether or not there are subscribers. That is to say, if you don’t subscribe to it at the beginning, then you don’t know the content of the previous live broadcast when you go back to watch it after a while.

Operator parsing that appears in the above code

When creating the Hot Observables we used a combination of the publish and connect functions. The result of calling the Publish operator is a ConnectableObservable, The connect method is then provided on the object to allow us to control when the data is sent.

  • Publish: This operator converts normal Cold Observables into ConnectableObservables.

  • ConnectableObservable: A multicasting shared Observable that is available for concurrent subscription by multiple Observers, ConnectableObservable is a Hot Observables. ConnectableObservable is the middleman between the subscriber and the actual source Observables (the interval in the example above, which sends a value every second, which is the source Observables), The ConnectableObservable receives the value from the source Observables and forwards it to the subscriber.

  • Connect () : A ConnectableObservable does not actively send values. It has a connect method. By calling connect, you can start a shared ConnectableObservable to send values. When we call ConnectableObservable. Prototype. The connect method, regardless of being subscription, will send the value. Subscribers share the same instance, and the value that subscribers receive depends on when they start their subscription.

In fact, this manual control is quite troublesome, is there a more convenient way to operate, such as listening to a subscriber to start sending data, once all subscribers have cancelled, stop sending data? Well, there is. Let’s look at the reference count:

Reference counting

Publish and refCount are used to achieve an automatic effect.

const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() = > {
	source.subscribe(data= > { console.log("A:" + data) });
	setTimeout(() = > {
		source.subscribe(data= > { console.log("B:" + data) });
	}, 1000);
}, 2000);

/ / A: 0
/ / A: 1
/ / B: 1
/ / A: 2
/ / B: 2
Copy the code

By looking at the essence of the results, we can easily find that only when A subscribes, it starts to send data (the data A gets starts from 0), and when B subscribes, it can only get the data currently sent, but not the previous data.

What’s more, the automatic transmission stops sending data when all subscribers unsubscribe.

Schedulers

Used to control concurrency and is a centralized dispatcher, allowing us to coordinate when calculations occur, such as setTimeout or requestAnimationFrame or others.

  • A scheduler is a data structure. It knows how to store and sort tasks by priority or other criteria.
  • The scheduler is the execution context. It indicates when and where the task is executed (for example, immediately, or another callback function mechanism (for examplesetTimeout 或 process.nextTick), or animation frames).
  • The scheduler has a (virtual) clock. The scheduler functions through itsgettermethodsnow()Provides the concept of “time”. Tasks scheduled on a specific scheduler will follow exactly the time indicated by that clock.

Learn this believe that you have more or less to RxJS have a certain understanding, I do not know that you have found a question, the code examples shown in the previous have synchronous and asynchronous, and the author did not show the control of their execution, their execution mechanism in the end is what?

In fact, their internal scheduling relies on Schedulers to control when data is sent. Many operators preset different Schedulers, so we don’t need to handle them in a special way to run both synchronously and asynchronously.

const source = Rx.Observable.create(function (observer: any) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
});

console.log('Before subscription');
source.observeOn(Rx.Scheduler.async) / / set to async
.subscribe({
    next: (value) = > { console.log(value); },
    error: (err) = > { console.log('Error: ' + err); },
    complete: () = > { console.log('complete'); }});console.log('After subscription');

/ / subscription
/ / subscription
/ / 1
/ / 2
/ / 3
// complete
Copy the code

From the point of view of the print results, the data transmission time has indeed changed from synchronous to asynchronous. If the scheduling mode is not changed, the “subscribed” printing should be performed after the data is sent.

After looking at the example, let’s look at what kinds of scheduling this scheduler can do:

  • queue
  • asap
  • async
  • animationFrame

queue

Place each next task in the queue instead of executing it immediately

Queue latency behaves the same as the async scheduler when it uses the scheduler.

When no delay is used, it synchronously schedules a given task – executes as soon as the task is scheduled. However, when called recursively (that is, within a scheduled task), the queue scheduler is used to schedule another task, and instead of executing immediately, the task is put on the queue and waits for the current task to complete.

This means that when you execute a task using the Queue scheduler, you are certain that it will end before any other tasks scheduled by that scheduler begin.

This synchronization and our common understanding of synchronization may not be the same, the author was also confused for a while.

Let’s use an official example to explain how this type of scheduling is understood:

import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() = > {
  queueScheduler.schedule(() = > console.log('second'));

  console.log('first');
});
Copy the code

Instead of focusing on unfamiliar function calls, we’ll focus on the differences between this type of scheduling and normal synchronous scheduling.

First we call queueScheduler’s schedule method to start execution, and then we call it again inside the function in the same way (again, this could be a recursion, but it might be better to use this example here) and pass in a function that prints second.

Then look at the following statement, a normal console.log(‘first’), and let’s look at the print:

// first
// second
Copy the code

If you don’t see why, you can go back and see how the queue handles recursion. If it is called recursively, it maintains an internal queue and waits for the first queued task to complete (i.e. Console.log (‘first’) above before executing console.log(‘second’)). Because console.log(‘second’) is a task that was added to the queue later.

asap

The internal promise-based implementation (process.nextTick on the Node side) uses the fastest asynchronous transport mechanism available, NextTick or the Web Worker’s MessageChannel may also call setTimeout if the Promise or process.nextTick is not supported.

async

Much like ASAP, except that setInterval is used internally for scheduling, mostly for time-based operators.

animationFrame

From name actually believe that everyone has to know a little, internal to achieve scheduling based on the requestAnimationFrame, so the timing of the execution with the window. The requestAnimationFrame consistent, suitable for frequently apply colours to a drawing or operating animation scene.

Operators

Operator concept

Use pure functions in a functional programming style, using operators such as map, filter, concat, flatMap, and so on to process collections. Because of its pure function definition, we know that calling any operator does not change the existing Observable instance, but instead returns a new Observable.

Although RxJS is based on An Observable, its operators are the most useful. Operators are the basic units of code that allow complex asynchronous code to be easily combined in a declarative manner.

Implementing an Operator

Assuming we don’t use the filter operators provided by RxJS, what if we let you implement them yourself?

function filter(source, callback) {
    return Rx.Observable.create(((observer) = > {
        source.subscribe(
            (v) = > callback(v) && observer.next(v),
            (err) = > observer.error(err),
            (complete) = >observer.complete(complete) ); }}))const source = Rx.Observable.interval(1000).take(3);
filter(source, (value) = > value < 2).subscribe((value) = > console.log(value));

/ / 0
/ / 1
Copy the code

This implements a simple filter operator. The main idea is to return a new Observable based on the incoming Observable.

Code first creates a observables, with a new observer then subscribe to the incoming source, and call the callback function to judge whether the value need to continue, and if it is false, skip, according to our incoming source and filter function, the source object will eventually sent three Numbers 0, 1, 2, print the result of 0, 1, 2 is filtered.

Of course we can also place it on rX.Observable. Prototype so that we can get the source using this:

Rx.Observable.prototype.filter = function (callback) {
    return Rx.Observable.create(((observer) = > {
        this.subscribe(
            (v) = > callback(v) && observer.next(v),
            (err) = > observer.error(err),
            (complete) = > observer.complete(complete)
        );
    }))
}
Rx.Observable.interval(1000).take(3).filter((value) = > value < 2).subscribe((value) = > console.log(value));

/ / 0
/ / 1
Copy the code

This would not be more concise, just as we used the filter method of the native array.

The difference between the two approaches is easy to understand: one is in Prototype and can be called directly by the instantiated object, while the other defines a new function that can be exported to the caller (or mounted directly to the static property of the Observable).

Readers may have guessed what I’m going to talk about next.

Instance operators – Static operators

  • Instance operator: Usually an operator that can be called directly by an instantiated object. We tend to use the instance operators a little bit more, for examplefilter,map,concatAnd so on. The example operator can be used more happilythis, while eliminating one argument and maintaining the chained call.
  • Static operators:ObservableIs aclassClass, we can mount the operator directly onto its static property. The advantage is that it can be called without instantiation, but the disadvantage is that it can no longer be usedthisInstead, you need to pass in the target object.

If you add an instantiation property, we already have an example above, so I won’t go into that here.

Modify the filter example above by mounting it to a static property:

Rx.Observable.filter = (source, callback) = > {
    return Rx.Observable.create(((observer) = > {
        source.subscribe(
            (v) = > callback(v) && observer.next(v),
            (err) = > observer.error(err),
            (complete) = >observer.complete(complete) ); }}))Copy the code

Creational Operators

The first thing we need to focus on when dealing with or using any data is where it came from, how it came from, and how we get it.

create

Definition:

  • public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable

After the baptism of the previous code, I believe you are not unfamiliar with the operator.

Create converts the onSubscription function to an actual Observable. The onSubscription function takes an Observer instance as the only parameter to execute every time someone subscribing to the Observable. OnSubscription should call the next, Error, and complete methods of the observer object.

When someone subscribes to an Observable, it passes a set of values by calling the subscriber’s own methods.

The figure above is not directly related to the demo code.

const source = Rx.Observable.create(((observer: any) = > {
    observer.next(1);
    observer.next(2);
    setTimeout(() = > {
        observer.next(3);
    }, 1000)}))/ / way
source.subscribe(
    {
        next(val) {
            console.log('A:'+ val); }});2 / / way
source.subscribe((val) = > console.log('B:' + val));

/ / A: 1
/ / A: 2
/ / B: 1
/ / B: 2
After the / / - 1 s:
/ / A: 3
/ / B: 3
Copy the code

The printed result is needless to mention, first A and B will be printed, 1, 2, and 3 after 1 second.

Note that we can use either of these methods to call subscribe, either as an object with the next, error, and complete methods (all optional), or as a function. The parameters are next, error, and complete respectively.

empty

Definition:

  • public static empty(scheduler: Scheduler): Observable

As the name implies, this operator creates an operator that emits no data, but simply a completion notification.

Some readers here may ask what this thing is for.

In fact, it works well with some operators, such as mergeMap, which will be explained later. If you can’t wait, skip to mergeMap.

from

Definition:

  • public static from(ish: ObservableInput<T>, scheduler: Scheduler): Observable<T>

Create an Observable from an array, an array-like object, a Promise, an iterator object, or an Observable like object.

This method is a bit like the Array.from method in JS (you can create a new Array from a class Array or an iterable), but in RxJS it turns into an Observable for the consumer to use.

const source = Rx.Observable.from([10.20.30]);
source.subscribe(v= > console.log(v));

/ / 10
/ / 20
/ / 30
Copy the code

From the point of view of the example code, this is actually quite simple to use, if you want to use RxJS to manage some data (such as arrays or arrays of classes) for an existing project, then the from operation is a good choice.

fromEvent

Definition:

  • public static fromEvent(target: EventTargetLike, eventName: string, options: EventListenerOptions, selector: SelectorMethodSignature<T>): Observable<T>

Creates an Observable that emits an event of the specified type from the given event object. This can be used for Dom events in a browser environment or EventEmitter events in a Node environment.

Suppose we have a requirement that listens for button click events and prints them out:

const click = Rx.Observable.fromEvent(document.getElementById('btn'), 'click');
click.subscribe(x= > console.log(x));
Copy the code

This is a smoother way of writing than if we were using addEventListener.

fromPromise

Definition:

  • public static fromPromise(promise: PromiseLike<T>, scheduler: Scheduler): Observable<T>

It’s pretty obvious from the name, which is to turn a Promise into an Observable so that we don’t have to write chained calls like. Then,. Catch.

If the Promise is at three similarities, the output Observable issues that value and finishes. If the Promise is rejected, the output Observable issues an error.

const source = Rx.Observable.fromPromise(fetch('http://localhost:3000'));
source.subscribe(x= > console.log(x), e= > console.error(e));
Copy the code

Here to demonstrate the effect, the local up a service for testing, self-testing can use another.

So we can easily get the return value of the request.

interval

Definition:

  • public static interval(period: number, scheduler: Scheduler): Observable

An Observable created with this operator can emit consecutive numbers over a specified period of time, similar to the pattern we use with setInterval. We can use this operator when we need to retrieve a sequence of numbers, or when we need to do something on a regular basis.

const source = Rx.Observable.interval(1000);
source.subscribe(v= > console.log(v));

Copy the code

By default, it starts at 0. The time set here is 1 second, and it continues to emit data at specified intervals. In general, we can use the take operator to limit the amount of data emitted.

of

Definition:

  • public static of(values: ... T, scheduler: Scheduler): Observable<T>

It is not much different from from, but it is called by passing a parameter, similar to the CONCat method in JS. It also returns an Observable that, in turn, merges the parameters you pass in and emits the data synchronously.

const source = Rx.Observable.of(1.2.3);
source.subscribe(v= > console.log(v));

/ / 1
/ / 2
/ / 3
Copy the code

Print 1, 2, 3 in sequence.

repeat

Definition:

  • public repeat(count: number): Observable

Repeat the data source n times for the numeric type parameter you passed in.

const source = Rx.Observable.of(1.2.3).repeat(3);
source.subscribe(v= > console.log(v));
Copy the code

With the “of” operator, the result is to print 1, 2, 3, 1, 2, 3, 1, 2, 3, instead of printing 1, 2, 3 once.

range

Definition:

  • public static range(start: number, count: number, scheduler: Scheduler): Observable

Creates an Observable that emits a sequence of numbers in a specified range.

Those of you who have studied Python have a sense of deja vu.

const source = Rx.Observable.range(1.4);
source.subscribe(v= > console.log(v));
Copy the code

Print results: 1, 2, 3, 4.

Is it easy?

Conversion operator

So what is conversion operators, as is known to all, in the daily business, we always need to deal with all kinds of data, most of the time we are not to directly transfer to come over to use the data, but do some transformation of its own, let him become more fit our demand shape, that is the role of the conversion operators.

buffer

Definition:

  • public buffer(closingNotifier: Observable<any>): Observable<T[]>

Collect past values into an array and emit the array only when another Observable emits a notification. It’s like having a buffer, collecting data, waiting for a signal, and then releasing it.

;

The change operator is a bit like a big dam, sometimes we will choose to store water, wait until the specific time, then the leader ordered to open the dam, let the water out.

Here’s an example:

Suppose that we have such a demand, we have an interface is designed to retrieve specific data, but the interface one-time return only a data, it makes us very upset, because the product of data to a specific value to control the operation, also is his one click a button, to render the data comes out, that what should I do?

This is where our buffer operator comes in:

const btn = document.createElement('button');
btn.innerText = 'You point at me! '
document.body.appendChild(btn);
const click = Rx.Observable.fromEvent(btn, 'click');
const interval = Rx.Observable.interval(1000);
const source = interval.buffer(click);
source.subscribe(x= > console.log(x));
Copy the code

Here we directly use Interval to demonstrate that the interface gets data, and then implement the function with buffer.

Here we wait four seconds and then click the button. The printed value is [0, 1, 2, 3]. Then wait another eight seconds and click the button: [4, 5, 6, 7, 8, 9, 10, 11].

From the phenomenon, it is not difficult for us to see that we have realized the transmission of data through button control. At the same time, we can find another phenomenon, the sent data will be directly emptied in the buffer, and then collect new data.

It’s not hard to understand. Let’s take the example of a dam. We open the dam and release water for a period of time, and then we close it and continue to store water.

concatMap

Definition:

  • public concatMap(project: function(value: T, ? index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

This operator is interesting. Let’s look at the description on the website:

Project the source value into an Observable that is merged into the output Observable, waiting serially for the previous one to complete before merging the next Observable.

I don’t know if you readers feel “a little bit” is not easy to understand, but the author will be able to easily understand a small example:

Suppose you encounter such a scene, you and your girlfriend are shopping in the snack street, but her girlfriend has a bad habit, she always likes to buy a bite and then leave for you to eat, and then another shop to buy a little bite and then leave for you to eat, and you need time to eat every time. Usually, your boyfriend’s girlfriend will wait for you to eat before buying a house. In this case, you can still eat before taking a rest. On the other hand, regardless of whether your girlfriend finished eating or not, she continues to buy, and then more and more food in your hands, the speed of your eating completely can not catch up with the speed of your girlfriend to buy, then this time will lead to your heavier and heavier load, and finally can not bear the explosion of mentality.

The above scenario contains several core points of concatMap and points to note:

  1. The source value sends a data that you then pass insideObservableIt starts to work or it sends data, and the subscriber receives the data, which is internalObservableIt is equivalent to always waiting for the source object to send a data before the next round of work is done, and the next round of work is completed.
  2. If the current round of work is not completed and the data sent by the source object is received, it will be stored in a queue, and then once the current round is completed, the queue will be checked to see if there are any more. If there are, the next round will be started immediately.
  3. If internalObservableIs longer than the interval between data being sent by the source object, which can lead to larger cache queues and performance problems

Popular point to understand is that actually a factory assembly line, a responsible for hair material, another is responsible for making products, hair material is the source object, making products is the internal observables, this factory output can only be in the finished product is completed, so the production of products such as subscribers to do to get one.

If you send the material faster than the person making it can make a product, you can build up the material, and over time it will build up, and the factory won’t be able to hold it.

Understanding with code:

const source = Rx.Observable.interval(3000);
const result = source.concatMap(val= > Rx.Observable.interval(1000).take(2));
result.subscribe(x= > console.log(x));
Copy the code

First of all, we create a source object that sends data every three seconds. Then we call the instance method concatMap and pass it a function that returns an Observable. Finally, we get the Observable transformed by concatMap and subscribe to it.

The running result is: First, the source sends the first data in the third second of the program, and then at this point our internal Observable, which we passed in, starts to work, and after two seconds it sends two increasing numbers, and then the subscription function prints out the two numbers step by step, and after one second which is the sixth second of the program, the source sends the second number, Repeat the process at this point.

map

Definition:

  • public map(project: function(value: T, index: number): R, thisArg: any): Observable<R>

If you use the map method in js array more, perhaps here basically do not need to look at, the use of exactly the same.

You just pass in a function, and the first parameter of the function is every piece of data from the data source, and the second parameter is the index value of that data. You just return a value that is returned after a calculation or other operation as the value that the subscriber actually gets.

const source = Rx.Observable.interval(1000).take(3);
const result = source.map(x= > x * 2);
result.subscribe(x= > console.log(x));
Copy the code

The take operator is basically just a limit on how many numbers you can take and no longer send data.

This is used to demonstrate that the value of each data source is multiplied by 2 and sent to the subscriber, so the printed values are 0, 2, and 4.

mapTo

Definition:

  • public mapTo(value: any): Observable

Ignore the data sent by the data source and only send the specified value (pass parameter).

It’s like when a guy you hate asks you to pass the word, he says a lot of confession, and then asks you to pass it on to some girl, you don’t want to help him because you hate him, so you tell her I like you, and then you live happily together.

const source = Rx.Observable.interval(1000).take(3);
const result = source.mapTo(Awesome!);
result.subscribe(x= > console.log(x));
Copy the code

As in this code, the data source sends 0, 1, 2, while the subscriber actually receives three 666s.

mergeMap

Definition:

  • public mergeMap(project: function(value: T, ? index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

This definition looks a little scary, but let’s not panic, we just need to understand how it’s used in most cases.

Remember in the introduction to empty, I left a hole to show you how mergeMap and Empty work together? I’m just going to fill in this hole right here.

const source = Rx.Observable.interval(1000).take(3);
const result = source.mergeMap(x= > x % 2= = =0 ? Rx.Observable.of(x) : Rx.Observable.empty());
result.subscribe(x= > console.log(x));
Copy the code

The input source is a data source that sends the numbers 0, 1, and 2. We call the mergeMap operator and pass in a function that sends the subscriber if the current value sent by the input source is an even number, and not otherwise.

When you compare it to a map, you can see that the map return value must be an Observable, whereas mergeMap returns an Observable. We can return any transform or Observable with other capabilities.

pluck

Definition:

  • public pluck(properties: ... string): Observable

Used to select the specified property value on each data object.

If a data source sends an object that has a name attribute, and the subscriber points to the name attribute, then the operator can be used to extract the attribute value to the user.

const source = Rx.Observable.of({name: 'Joe'}, {name: 'bill'});
const result = source.pluck('name');
result.subscribe(x= > console.log(x));

/ / zhang SAN
/ / li si
Copy the code

Without a doubt, this operator is used to extract attributes, equivalent to using the Map operator to extract the name and return it to the subscriber.

scan

Definition:

  • public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>

The accumulator operator, which can be used for state management, is quite useful.

For usage, we can refer to the Reduce function of js arrays.

Suppose we now have a requirement that we want to add up the data sent from the data source and return it to the subscriber. What should we do?

const source = Rx.Observable.interval(1000).take(4);
const result = source.scan((acc, cur) = > acc + cur, 0);
result.subscribe(x= > console.log(x));
Copy the code

In code, the data source sends four values: 0, 1, 2, and 3, and each time the subscriber receives the value is the sum of the previous received number and the current number, respectively: 0, 1, 3, and 6.

Looking at usage, we pass the first argument to the SCAN operator a function that accepts two values: acc (the result or initial value of the previous sum), cur (the current value), and the second, the initial value of the calculation.

switchMap

Definition:

  • public switchMap(project: function(value: T, ? index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable

This is the combination of the switch operator and the Map operator. The Switch operator is covered in the combination operators section.

The main function is to merge multiple Observables and have the ability to interrupt. This means that one Observable sends data first, and subscribers can receive the data normally. When another Observable starts sending data, the first Observable is interrupted and only the data sent by the next Observable is sent.

In layman’s terms, when someone is talking and you start talking loudly, they are interrupted and you are the only person who can hear you.

const btn = document.createElement('button');
btn.innerText = 'I want to speak! '
document.body.appendChild(btn);
const source = Rx.Observable.fromEvent(btn, 'click');
const result = source.switchMap(x= > Rx.Observable.interval(1000).take(3));
result.subscribe(x= > console.log(x));
Copy the code

Code to achieve the function is, when a classmate click the button, then start to send numbers from 0, at this time if a student has not sent data, students two again click, then the data of a student will not be sent again, started to send students two.

Suppose that student 2 clicks the button after 1 o ‘clock, then the result will be printed: 0, 1, 0, 1, 2. Here, the data sent by student 2 starts from the second 0.

Other conversion operators

Portal: Conversion operator

  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen
  • concatMapTo
  • exhaustMap
  • expand
  • groupBy
  • mergeMapTo
  • mergeScan
  • pairwise
  • partition
  • switchMapTo
  • window
  • windowCount
  • windowTime
  • windowToggle
  • windowWhen

Filter operator

debounceTime

Definition:

  • public debounceTime(dueTime: number, scheduler: Scheduler): Observable

For those of you who have some experience in JS development, you should know the debounce function, so this time you will ask, it is not similar to Debounce, right? Yes, it does the same thing as the Debounce function, but with a slight difference.

A value is emitted from a source Observable only after a specified period of time has passed and no other source value has been emitted.

In other words, if a data source sends a number every second, and we use the debounceTime operator and set the delay time, then after the data source sends a new data, if the data source sends another data during the delay time, the new data will be cached and not sent. Wait after sending the data and wait for the end of the delay time will be sent to the subscriber, not only that, the delay time before the time and have a value in the buffer, this time and receive a new value, the buffer will throw away the old data, into the new, and then wait for the delay time and then send it again.

const source = Rx.Observable.interval(1000).take(3);
const result = source.debounceTime(2000);
result.subscribe(x= > console.log(x));
Copy the code

From the point of view of the code, let’s guess, what is the final print?

First, we created a data source that sends a number every second and only three times. Then we used debounceTime and set the delay time to 2 seconds. At this point, we looked at the data that was printed, and we saw that no data was printed for the first 3 seconds after the program started. Then there was no more printing. Why?

Answer is that the data source will in turn send three per second 0, 1, 2, because we set the delay time to 2 seconds, then that is to say, before the completion of the data we are impossible to see the data, because the sending frequency source for a second, there are two seconds delay time, send finished, that is, unless, otherwise can’t meet the new data source for two seconds to send, It takes about two seconds for the data to be printed, so no matter how many numbers we send from the data source, the subscriber will only receive the last number.

throttleTime

Definition:

  • public throttleTime(duration: number, scheduler: Scheduler): Observable<T>

How can the introduction of tremble forget its old buddy throttling?

The main capability of the operator is the same as that of the throttling function, that is, it will only send one data at a certain time, and the redundant data will be discarded. The only difference with the shake stabilization operator is that it does not block for the first value.

const source = Rx.Observable.interval(1000).take(6);
const result = source.throttleTime(2000);
result.subscribe(x= > console.log(x));

/ / 0
/ / 3
Copy the code

Print the results shown above, in fact, the effect also is very easy to explain, the code creates a data source sends a increase starting from 0 degrees per second, sent a total of 6 0 to 5, and use throttleTime set two seconds, the subscriber will not be blocked when receiving the first value, but of the two seconds after receiving a all take less than the value, That’s the fourth second to get the 3.

distinct

Definition:

  • public distinct(keySelector: function, flushes: Observable): Observable

This operator is also very easy to understand. It can be summarized in one sentence: when the operator is used, the data received by the subscriber will not be duplicate, that is, it is used to filter duplicate data.

const source = Rx.Observable.from([1.2.3.2.4.3]);
const result = source.distinct();
result.subscribe(x= > console.log(x));
Copy the code

The final result of the program is: 1, 2, 3, 4, and the number of repeats is filtered directly.

filter

Definition:

  • public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable

This kind of basic should not have what good introduction, and we understand the array filter method is not different, but the place is not consistent.

const source = Rx.Observable.from([1.2.3.2.4.3]);
const result = source.filter(x= >x ! = =3);
result.subscribe(x= > console.log(x));
Copy the code

The result is that all values except 3 are printed.

first

Definition:

  • public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R>

Emit only the first value (or the first value that satisfies the condition) emitted by the source Observable.

This is also similar to the above, the basic introduction can understand, here no longer redundant.

take

Definition:

  • public take(count: number): Observable<T>

Emit only the N values (N = count) originally emitted by the source Observable.

This operator has been used many times before, and it’s a very common one, to control how many values to get, and in conjunction with something like interval, which sends data continuously, you can control how many values to get.

skip

Definition:

  • public skip(count: Number): Observable

Returns an Observable that skips the first N values (N = count) emitted by the source Observable.

For example, if the data source sends six values, you can use the SKIP operator to skip the first number.

const source = Rx.Observable.from([1.2.3.2.4.3]);
const result = source.skip(2);
result.subscribe(x= > console.log(x));
Copy the code

The printed result is 3, 2, 4, 3, skipping the first two numbers.

Other filter operators

The official operator is still a lot of, here is not an introduction, interested in can go to the official website to see: filter operator.

  • debounce
  • distinctKey
  • distinctUntilChanged
  • distinctUntilKeyChanged
  • elementAt
  • ignoreElements
  • audit
  • auditTime
  • last
  • sample
  • sampleTime
  • single
  • skipLast
  • skipUntil
  • skipWhile
  • takeLast
  • takeUntil
  • takeWhile
  • throttle

Combination operator

concatAll

Definition:

  • public concatAll(): Observable

As the name suggests, this operator is a bit like concat, which is used to combine multiple Observable’s into one, except that it is serial. That is, it combines two Observable’s, so the subscriber gets the first Observable before fetching the value. It then begins to receive the value of the latter Observable.

const source1 = Rx.Observable.of(1.2);
const source2 = source1.map(x= > Rx.Observable.interval(1000).take(3));
const result = source2.concatAll();
result.subscribe(x= > console.log(x));
Copy the code

In order for the subscriber to receive more than one Observable, the map operator returns a new Observable, and the map operator returns a new Observable. ConcatAll is used to merge the results, and the final subscriber receives the results in order: 0, 1, 2, 0, 1, 2.

mergeAll

Definition:

  • public mergeAll(concurrent: number): Observable

It’s almost the same as concatAll, except that it’s parallel, meaning that multiple merged Observables send data in no particular order.

combineLatest

Definition:

  • public combineLatest(other: ObservableInput, project: function): Observable

Combine multiple Observables to create an Observable whose value is calculated based on the latest value of each input Observable.

This operation symbol is not easy to understand from the introduction, let’s combine the example to explain.

const s1 = Rx.Observable.interval(2000).take(3);
const s2 = Rx.Observable.interval(1000).take(5);
const result = s1.combineLatest(s2, (a, b) = > a + b);
result.subscribe(x= > console.log(x));
Copy the code

The printed results are 0, 1, 2, 3, 4, 5, 6.

First of all, let’s look at how this combineLatest is used. It’s an instance operator, and what we’re showing here is that we’re combining S1 and S2 together, and the second argument needs to be passed in a callback to process the value of the combination. Since we’re only combining two arguments here, we only accept a and b. The return value of this callback function is the value obtained by the subscriber.

In fact, we can’t tell anything from the results. The main reason is that the process is as follows:

  1. s2Send a 0, and at this points1If no value is sent, the callback we pass in will not be executed and the subscriber will not receive the value.
  2. s1Send a 0, ands2The last sent value is 0, so the callback function is called and these two arguments are passed in, and the subscriber receives them
  3. s2Send a 1, ands1The last time it was sent was 0, so the result is 1.
  4. s1Send a 1, ands2The last sent value was 1, so the result is 2.
  5. s2Send a value of 2, ands1The last value sent was 1, so the result is 3.
  6. s2Send a value of 3, ands1The last time the value was sent was 1, so the result is 4.
  7. . Repeat the preceding steps.

One thing to note here is that at some point s1 and S2 will send data together, but this will also be sequential, so it’s up to the one who defines it first to send it first, and you can see that from the steps above.

In fact, there is a dependency between the combined sources, that is, the subscriber will not receive the message until both sources have sent at least one value, and the end signal will not be sent until both sources have sent.

zip

Definition:

  • public static zip(observables: *): Observable<R>

Combine multiple Observables to create an Observable whose value is calculated from the values of all the input Observables in order. If the last argument is a function, this function is used to calculate the final emitted value. Otherwise, an array containing all input values in order is returned.

In layman’s terms, multiple sources are aligned in order, slightly different from the combineLatest above.

No more words, on the code:

const s1 = Rx.Observable.interval(1000).take(3);
const s2 = Rx.Observable.interval(2000).take(5);
const result = s1.zip(s2, (a, b) = > a + b);
result.subscribe(x= > console.log(x));
Copy the code

The output is 0, 2, and 4.

How do we understand this? First of all, remember that the numbers that are being counted between multiple sources are sequentially aligned, that is, the first number of S1 is aligned with the first number of S2, and that one-to-one calculation, so the end result that the subscriber receives is that the aligned numbers are passed in and we call the last callback function of zip, This is the result that is returned to the user after the value is computed, which is optional.

When either source ends, the whole is signaled to end because there are no more numbers to align.

startWidth

Definition:

  • public startWith(values: ... T, scheduler: Scheduler): Observable

The returned Observable emits the item specified as a parameter and then the item emitted by the source Observable.

How to understand, in fact, a good example, for example, there is a string of tanghulu, the whole is a color, you think not beautiful, so you inserted in the front of the string of tanghulu several different colors of the tanghulu, this time the user will eat when you inserted in the front of the tanghulu.

const source = Rx.Observable.interval(1000).take(3);
const result = source.startWith(Awesome!)
result.subscribe(x= > console.log(x));
Copy the code

The output is 666, 0, 1, and 2.

That makes sense.

switch

Definition:

  • public switch(): Observable<T>

Convert a higher-order Observable into a first-order Observable by subscribing only to the newly issued internal Observable.

The use of this operator was mentioned earlier when we introduced the switchMap conversion operator, which is equivalent to map+switch=switchMap.

Here’s an example:

const btn = document.createElement('button');
btn.innerText = 'I want to speak! '
document.body.appendChild(btn);
const source = Rx.Observable.fromEvent(btn, 'click');
const source2 = source.map(x= > Rx.Observable.interval(1000).take(3));
const result = source2.switch();
result.subscribe(x= > console.log(x));
Copy the code

This code implements the same effect as switchMap. When the user clicks the button, the data will be sent. When the data transmission is not complete, the user clicks the button again, and a new data transmission process will be started, and the original data transmission process will be discarded.

Other combination operators

Portal: combinatorial operator

  • combineAll
  • concat
  • exhaust
  • forkJoin
  • merge
  • race
  • withLatestFrom
  • zipAll

The multicast operator

Portal: multicast operator

  • cache
  • multicast
  • publish
  • publishBehavior
  • publishLast
  • publishReplay
  • share

To be perfect…

Error handling operator

Portal: error handling operator

  • catch
  • retry
  • retryWhen

To be perfect…

Tool operator

Portal: Tool operator

  • do
  • delay
  • delayWhen
  • dematerialize
  • finally
  • let
  • materialize
  • observeOn
  • subscribeOn
  • timeInterval
  • timestamp
  • timeout
  • timeoutWith
  • toArray
  • toPromise

To be perfect…

Conditions and Boolean operators

Portal: Conditions and Boolean operators

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

To be perfect…

Mathematical and aggregate operators

Portal: Mathematical and aggregation operators

  • count
  • max
  • min
  • reduce

To be perfect…

conclusion

In general, RxJS as a data flow form to deal with the complex data in our daily business is very maintenance friendly, and in many complex data operations, RxJS can bring us a lot of efficiency operators, but also give us a novel idea of data operation.

We can think of RxJS as a lodash library that can emit events, encapsulating a lot of complex operation logic, allowing us to use the process of data conversion and manipulation in a more elegant way.

Refer to the article

The official documentation

A Guide to Responsive Programming – easy to understand RxJS

Fisherman and Rxjs story, this will teach you the front end of Rxjs must be

Use RxJS to build responsive applications

Thoroughly understand Observable, Observer, and Subject in RxJS