By Yin Ronghui @Tencent

This article can be viewed in my Github. If you think it is ok, Please send your precious star.

Write first: you must stick to this story, you will understand Rxjs. Don’t give up just because the plot isn’t as good as The Tomb Raiders. Because the courtiers really can only write the boring program into this very (quite) interesting (simple) story.

The story goes like this

Rxjs story has several protagonists in the picture above, we will introduce them one by one, these protagonists you must know.

(1) Rx.Observable is a river. It’s an environment that people live in. From this, a community of people began to make a living around the river.

(2) The source acts as a bamboo tube on a fishing boat in a river (it is equivalent to stuffing all the data obtained from the server into this tube to form a data stream source). Fish (Data) can drill into the bamboo tube one by one (Source)

var source = Rx.Observable.create(subscriber) Copy the code

(3) Subscriber is a fisherman and a kind-hearted person, whose main task is to throw the captured fish data to the hungry people on the shore. The fisherman is the manager of how to distribute the server-side data.

var subscriber = function(observer) {
    var fishes = fetch('http://www.oa.com/api'); // Fishes are captured observer.next(fish.fish1); // The hungry people who throw the first fish they catch ashore. // Throw a second catch of fish to the hungry people on the shore}Copy the code

(4) Observer as starving people on the shore. It’s the end consumer who gets the data from the server and decides what to do with it on the user’s page, much like a hungry person gets their fish and decides how to cook it. Because they come from all over the world and dialects are different, so the grammar is different when describing how to eat the fish after getting the fish, but in fact the essence is the same, what to do when there is a fish (value=> {}), what to do when there is no fish (error => {}), what to do when the fish thrown out (complete => {}).

A:

observer = (value => { console.log(value); },
    error => { console.log('Error: ', error); },
    () => { console.log('complete') }
)
source.subscribe(observer)Copy the code

Method 2:

observer = function(value) { console.log(value); } source.subscribe(observer); // Many hungry people subscribe to this fishing tube, so the bamboo tube (source) will be subscribed by the new hungry people. Of course, if the hungry people do not subscribe, the fishermen will not send bamboo tubes (source) and threw him the fish.Copy the code

Three:

observer = {
    next: function(value) {
        console.log(value);
    },
    error: function(error) {
        console.log('Error: ', error)
    },
    complete: function() {
        console.log('complete') } } source.subscribe(observer); The subscribe riversourceSubscribe (observer) knowing which people on either side of the river were in need of relief, he would subscribe to the fish thrown by the fisherman, so that he would receive the fish.Copy the code

(5) Subscription lists which bamboo tubes for which hungry people. It can be crossed off the list, and the hungry man will no longer receive fish thrown by the fisherman

subscription = source.subscribe(observer1); subscription.unsubscribe(); // Cross the subscription information for observer1 from the list, because observer1 is no longer a hungry person and does not need relief.Copy the code

We link the above five roles together is the implementation process of RXJS, we first use easy to understand pinyin, and then corresponding to the real RXJS syntax.

Var fisherman =function{var fishes = fetch(fishes)'server/api'); // fishes are captured to a certain number of hungry people. // next, throw fish1 to the hungry people. // In order for hungry people to cope with different situations, they should not starve to death without catching fish. next:function(fish) {// A fish is thrown. Cook it and eat it. }, the error:function(error) {// The fish are poisonous and inedible, so try another way to fill your stomach, such as eating wild vegetables,}, complete:function() {// The fish of the day are thrown away, so you can go home}} var bamboo = river; // When a fisherman comes to the river, he must put down his fishing tube in the river. List = bamboo tube.subscribe (hungry people 1) // After the bamboo tubeis concerned by hungry people 1, it can receive the fish thrown by the fisherman.setTimeout() => {list. Unsubscribe (); // A year later, when the starving people were out of trouble and no longer needed relief, the bamboo tube was withdrawn. Give the opportunity to others. }, 1 year);Copy the code

For real RXJS syntax, let’s do it again.

var subscriber = function(observer) {// Create a fisherman observer.next('fish1');
    observer.next('fish2'); observer.complete(); } var observer1 = {// next:function(value) {console.log(' I received the fish${value}Won't starve); }, error:function(error) {console.log(' Hey, the fish was caught because${error}Cause can't eat ')}, complete:function() {
        console.log('All the fish for the day.')
    }
}

var source= Rx.Observable.create(subscriber); // There comes a fisherman in the river. He puts down his fishing tube in the river. subscription = source.subscribe(observer1); // After the bamboo tube is noticed by hungry Person 1, hungry person 1 can receive the fish thrown by the fisherman.setTimeout(()=> { subscription.unsubscribe(); // After 3 seconds, the hungry people unsubscribe the bamboo tube and give the other hungry people a chance. }, 3000); The output is as follows: //"I got fish1 Lao."
// "I got fish2 Lao."
// "All the fish for today."Copy the code

This is the end of the Rxjs story, if you do not understand, then read the above story again. Haven’t understood, then read several times, ha ha.

You can see the result JS Bin by clicking here

Here are the solutions to the three stages of fishing: (1) how to get the fish out of the tube, (2) how to get the fish out of the tube, and (3) what happens to the fish when they are thrown ashore. So operators are used in a sequential fashion.

How can fish be produced in a bamboo tube

(1) Create fetch from underwater using CREATE when there is no fish in advance

var source = Rx.Observable
    .create(function(observer) {
          var fishes = waitForFishes_ajax_fetch(api);
        observer.next(fish.fish1);
        observer.next(fish.fish2);
        observer.complete();
    });Copy the code

(2) of(arg1,arg2)

When the fish are readily available but in bulk, such as yesterday when there were a few left on the boat, put them into bamboo tubes with “of”

var source = Observable.of(fish1,fish2);Copy the code

(3)from ([arg1,arg2,arg3]);

When there are ready-made, simultaneously strung strings in a row (an array structure), they need to be loaded into the tube using the FROM method

var fishes = [fish1, fish2];
var source= Observable.from(fishes); Note: From can also pass in the string varsource = Rx.Observable.from('Ironman'); // iron // man // race // complete!Copy the code

(4)fromEvent(document.body,’click’);

In addition to throwing fish ashore, fishermen sometimes use bamboo tubes as trumpets to tell hungry people ashore that things are not going well today.

var source = Rx.Observable.fromEvent(document.body, 'click');Copy the code

(5) empty,never,throw

var source= Rx.Observable.empty(); // When no fish are caught, the observer executes complete as // complete! varsource= Rx.Observable.never(); // The fishermen were too tired to tell the hungry people on the shore whether they had caught fish or not. The result is that // complete never fires varsource = Rx.Observable.throw('ill'); // When the fisherman is sick or has to visit an old friend, he will shout out to the hungry people on the shore with a bamboo tube, so that they can find other ways (trigger error method) to solve the food problem for the day.Copy the code

(6) interval(‘ interval ‘)

Rx.observable. interval(1000) // the fisherman is also bored fishing every day, and wants to play a game with the hungry people on the bank, throwing a fish to the hungry people on the bank every 1 second (and also on the fish table 0,1,2,3....) And let the hungry people get the fish, as long as the digital timer('Wait time for the first fish to be thrown'Timer (1000, 5000); rx.Observable. timer(1000, 5000); // The rules of the game have changed a bit. The fisherman tells the hungry people that he will throw the first fish ashore after 1000 milliseconds, and every 5000 milliseconds after that.Copy the code

Two. There is a fish in the bamboo tube, how to take it out

2.1 Single bamboo tube fishing

(1) to take

The fisherman decided to take only the first three tubes because he was afraid of draining the pond.

var source = Rx.Observable.interval(1000);
var example = source.take(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 1 // 2 // completeCopy the code

(2) first

Take the first fish. Take the first fish

var source = Rx.Observable.interval(1000);
var example = source.first();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // completeCopy the code

(3) takeUntil

TakeUntil is when a fisherman takes a fish from a bamboo tube and stops taking it after encountering a special fish (such as a golden arowana). Because taking any more would be unlucky and would offend the Dragon King (see Article XX of Journey to the West).

(4) concatAll()

Combine the fish in two bamboo tubes in series to form one bamboo tube fish and remove.

(5) skip

var source= Rx.Observable.interval(1000); var example = source.skip(3); Subscribe ({next: (value) => {console.log(value); }, error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 3 // 4 // 5...Copy the code

(6) takeLast ()

var source= Rx.Observable.interval(1000).take(6); var example = source.takeLast(2); Subscribe ({next: (value) => {console.log(value); }, error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 4 // 5 // completeCopy the code

(7) the last ()

var source= Rx.Observable.interval(1000).take(6); var example = source.last(); Example. Subscribe ({next: (value) => {console.log(value); }, error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 5 // completeCopy the code

(8) concat(observable1,observable2,….)

Again, string all the bamboo tubes together and take the fish out

var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3)
var source3 = rx.observable. Of (4,5,6) var example = source.concat(source2, source3); / / and concatAll () different concatAll ([observale1, observable2...]. ) is an array, and concat (observable1 observable2,...). Subscribe ({next: (value) => {console.log(value); }, error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 1 // 2 // 3 // 4 // 5 // 6 // completeCopy the code

(9) startWith ()

There may not have been enough fish caught that day to feed the hungry people on the shore. Fisherman secretly in front of the bamboo tube into a few, pretend to catch a lot of fish today, and then out.

var source= Rx.Observable.interval(1000); var example = source.startWith(0); Example. Subscribe ({next: (value) => {console.log(value); }, error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 0 // 1/2 // 3...Copy the code

(10) scan

When it is necessary to make a statistics on all the fish caught, such as the total weight of all the fish, it is necessary to scan each fish and weigh it, and add the weight of the previous one to the weight of the next one, so as to accumulate.

var source = Rx.Observable.from('hello')
             .zip(Rx.Observable.interval(600), (x, y) => x);

var example = source.scan((origin, next) => origin + next, ' ');

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // h // he // hel // hell // hello // completeCopy the code

(11) buffer, bufferCount bufferTime

The fisherman gets tired of throwing every fish he catches ashore, so he decides to save a certain number of fish every time (bufferCount(3)), or every time (bufferTime(1000)) to take the fish out of the barrel. Var example = source.buffer(source2); var example = source.buffer(source2); He took all the fish and threw them ashore.

var source = Rx.Observable.interval(300);
var source2 = Rx.Observable.interval(1000);
var example = source.buffer(source2);
var example = source.bufferTime(1000);
var example = source.bufferCount(3);


example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }});Copy the code

(12) delay ()

After catching a string of fish, the fisherman decided to smoke a cigarette before he began to take out the fish

var source= Rx.Observable.interval(300).take(5); var example = source.delay(500); // It takes 500 milliseconds for the fisherman to smoke his cigarette before throwing the fishsource : --0--1--2--3--4|
        delay(500)
example: -------0--1--2--3--4|

delayWhen('Certain conditions')
delayWhen((x) => {if(x==3) {returnRx.observable. Empty ().delay(500)}}) // When the third fish is thrown, the fisherman decides to stop and smoke a cigarette for 500 milliseconds before continuingCopy the code

(13) debounceTime

Sometimes the fish are hooked too quickly and the fisherman is too old to take them one by one. So he decided not to take the fish out and throw them ashore when they were hooked at high frequency (there was no time), but to wait until the time between the two fish was long enough to catch them. I’m gonna take everything out at once. The time interval between the two catches is longer than debounceTime before the previous catch is removed and thrown ashore.

- 1-2-3 -- -- -- -- -- -- -- -- -- than debounceTime between 5 - / / 3, 5, a take out 1, 2, 3 thrown towards the shoreCopy the code

(14) throttle

In (13), sometimes the fishing interval is long, sometimes the fishing interval is short, and the fisherman can take out the fish saved by the last batch after resting for a long time. But when it came to the summer fishing season, the fish wouldn’t stop and the fisherman couldn’t get a break using the debounce strategy (he would die), so he had another idea. Every five seconds (throttleTime(5000), he would take a fish that was right on the hook and throw it out. Or if there are no fish, wait until there are some fish, throw them out, wait for 5 seconds, and so on, and then forget about any fish that are hooked at any other time. Anyway, there are enough fish to eat.

Note: For details about the differences between Debounce and throttle, refer to this article’s examples to analyze debounce and Throttling.

var source = Rx.Observable.interval(300).take(20);
var example = source.throttleTime(1000);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 4 // 8 // 12 // 16 //"complete"Copy the code

(15) distinct

During festivals, fishermen want to give people something unique, only take a different kind of fish each time, so that they can not eat Japanese food addiction. The fisherman only takes out different species of fish, discarding the ones that were previously present.

var source = Rx.Observable.from(['a'.'b'.'c'.'a'.'b'])
            .zip(Rx.Observable.interval(300), (x, y) => x);
var example = source.distinct()

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // a // b // c // completesource : --a--b--c--a--b|
            distinct()
example: --a--b--c------|Copy the code

2.2 How to take the fish out of bamboo tube fishing

Multiple streams exist, such as the following

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.map(e => rx.observable.interval (1000).take(3)) // It is only at this point that the multi-bucket fishing operation should be considered, until then, the existence of the multi-bucket fishing operator is not considered. .concatAll();Copy the code

(1) concatAll()

When there are more than one bamboo tube fishing, catch the fish bamboo tube, one by one in series, and then remove the fish.

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 1/2/0 // 1 // 2Copy the code

(2) the zip

(In both bamboo tubes, the first hooked fish is tied together and taken out, while the second hooked fish is tied together and taken out)

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.zip(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 2 // 4 // completesource : ----0----1----2|
newest : --0--1--2--3--4--5|
    zip(newest, (x, y) => x + y)
example: ----0----2----4|Copy the code

(3) the switch

If you switch the switch, you can switch the switch. When a, B and C are fishing, and A catches a fish, the fisherman will keep staring at a to get the fish, until the other tubes catch a fish. When a fish is caught in barrel B, the fisherman will switch his gaze to barrel B and take the fish from barrel B until another barrel catches the fish. And so on.

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.switch();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); click : ---------c-c------------------c--.. map(e => Rx.Observable.interval(1000))source: ---------o-o------------------o--.. \ \ \ - 0-1 -... \ - 0-1-2-3-4 -... - 0-1-2-3-4 -... switch() example: -----------------0----1----2--------0----1--...Copy the code

(4) the merge (observable2)

Every minute watching two bamboo tubes, one has to take one, two have fish at the same time, at the same time the fish out of the two tubes.

var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 0 // 1 // 2 // 1 // 3 // 2 // 4 // 5 // completeCopy the code

(5) mergeAll

In (4) above, the fisherman merges two bamboo tubes. When one of them has a fish, the fisherman takes the fish out at the same time. In mergeAll, a fisherman watches multiple bamboo tubes at the same time every minute. When one of them has a fish, he will take out the fish in the two tubes at the same time. When several of them have fish, he will take out all of them at the same time.

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.mergeAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); click : ---------c-c------------------c--.. map(e => Rx.Observable.interval(1000))source: ---------o-o------------------o--.. \ \ \ - 0-1 -... \ - 0-1-2-3-4 -... - 0-1-2-3-4 -... switch() example: ----------------00---11---22---33---(04)4--...Copy the code

(6) combineLatest()

Remove the latest fish from the two bamboo tubes

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // 0 // 1 // 2 // 3 // 4 // 5 // 6 // 7 // completesource : ----0----1----2|
newest : --0--1--2--3--4--5|

    combineLatest(newest, (x, y) => x + y);

example: ----01--23-4--(56)--7|Copy the code

2.3 Appendix: Quick operation of multi-bamboo tube fishing

Can be seen from the above more than to spill his fishing operation, when USES the bamboo tube to catch fish, often concatAll, switch, mergeAll these bamboo tube operator needs to be combined and map operator, so the fisherman’s decided to use the first operator directly replace the two operators, speed up the operation of the fish. Details are as follows:

(1) concatMap

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.map(e => Rx.Observable.interval(1000).take(3)) .concatAll(); Simplified as follows: varsource = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .concatMap(
                    e => Rx.Observable.interval(100).take(3)
                );Copy the code

(2) switchMap

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.map(e => Rx.Observable.interval(1000).take(3)) .switch(); Simplified as follows: varsource = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .switchMap(
                    e => Rx.Observable.interval(100).take(3)
                );Copy the code

(3) the mergeMap

var source = Rx.Observable.fromEvent(document.body, 'click');

var example = source.map(e => Rx.Observable.interval(1000).take(3)) .mergeAll(); Simplified as follows: varsource = Rx.Observable.fromEvent(document.body, 'click');

var example = source
                .mergeMap(
                    e => Rx.Observable.interval(100).take(3)
                );Copy the code

(c) What happens when the fish is thrown ashore after it is removed

(1)map(callback)

var source= Rx.Observable.interval(1000); Map (x => x + 1) // When a fisherman throws a fish into the sea, it passes through the area irradiated by map rays and becomes a mutation. The weight of the fish automatically increases by one jin. When the fish is handed to the hungry people, it is more than one jin heavier than the fish thrown out by the fisherman. newest.subscribe(console.log); The result is: // 1/2/3/4 // 5..Copy the code

(2) the mapTo ()

var source= Rx.Observable.interval(1000); var newest = source.mapTo(2); // When a fisherman throws a fish, it passes through the mapTo radiation area as it flies to the shore, and changes. The weight of the fish, whether fat or thin, changes to 2, and the hungry people get the fish weighing 2 kg. newest.subscribe(console.log); // 2 // 2 // 2 // 2...Copy the code

(3) the filter ()

var source= Rx.Observable.interval(1000); var newest = source.filter(x => x % 2 === 0); // When a fish is thrown by a fisherman, it passes through the filter ray area as it flies towards the shore. The filter ray acts as a wall to keep out fish that are not of the right weight. The fish that the hungry people get is a very large fish. newest.subscribe(console.log); // 0 // 2 // 4 // 6..Copy the code

(4) the catch ()

When Fish is thrown and flying in the sky and mutated by the operator, an accident occurs (mutated dead, mutated burnt). People on the shore should have a contingency plan, either eat wild fruit, or… We can’t mutate, and the people on shore starve to death.

var source = Rx.Observable.from(['a'.'b'.'c'.'d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .catch(error => Rx.Observable.of('h'));
 example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }});source: --a----b----c----d----2|
        map(x => x.toUpperCase())
         ----a----b----c----d----X|
        catch(error => Rx.Observable.of('h'))
example: ----a----b----c----d----h|  Copy the code

(5) Retry ()

When the fish is thrown past the mutation operator in the sky, retry () can be used to get the fisherman to throw again when the mutation is likely to fail (for example, doubling the fish’s weight). Retry (5) five times (you can customize the retry times).

var source = Rx.Observable.from(['a'.'b'.'c'.'d',2])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source
                .map(x => x.toUpperCase())
                .retry();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }});Copy the code

(6) repeat

Like Retry, Retry allows the fisherman to throw again when the mutation goes wrong in the sky. If the mutation is successful, the experiment is successful (the fish is successfully mutated into 2 catty by 1 catty in the air), also can let the fisherman come again. But at this time, you should repeat and tell the fisherman to use another one, rather than retry, otherwise the fisherman will think that the mutation experiment was not successful.

var source = Rx.Observable.from(['a'.'b'.'c'])
            .zip(Rx.Observable.interval(500), (x,y) => x);

var example = source.repeat(1);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }}); // a // b // c // a // b // c // completeCopy the code

References:

Rxjs official documentation

Master RxJS in 30 days