Creating Observableof()from()fromEvent()fromPromise()interval() Ajax () operator () mapFilterShare Other error handling Observable problem record for retry failures FR OmPromise no longer exported in Master (v6) References

Introduction to the

RxJS is the JavaScript version of the ReactiveX programming concept. ReactiveX, from Microsoft, is programming for asynchronous data streams. Simply put, it streams all data, whether HTTP requests, DOM events, or normal data, and then streams it with powerful and rich operators that allow you to programmatically process asynchronous data asynchronously and combine different operators to easily and elegantly implement what you need.

RxJS provides an implementation of the Observable type, which is necessary until an Observable is part of the JavaScript language and supported by browsers. The library also provides utility functions for creating and working with observables. These utility functions can be used to:

  • Transform existing asynchronous code into observables
  • Iterate over each value in the flow
  • Map these values to other types
  • Filtration by convection
  • Combine multiple streams

Primary core concepts

  • Observable: A producer of a series of values
  • Observer: It is the consumer of observable
  • Operator: An Operator that can convert values while in transit

Observables, as observed, is a stream of values or events; The Observer acts as an Observer, according to the Observables.

Observables and observers subscribe to publish (Observer mode) as follows:

  • Subscribe: The Observer subscribes to an Observable via the Subscribe () method provided by the Observable.
  • Publish: Observable publishes events to the Observer by calling back to the Next method.

Create observables

RxJS provides many operators for creating observables. Common operators are as follows:

  • Of (), which turns normal JavaScript data into an Observable
  • From (), converts an array or iterable object into an Observable
  • Create () returns an Observable that calls methods in the Observer.
  • FromEvent (), which converts events into Observables.
  • FromPromise () converts promises into Observables.
  • Ajax (), which creates an Observable from Ajax

of()

import { Component, OnInit } from '@angular/core';

import { Observable,of } from 'rxjs';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Use of to create observables </h2>

    <div>

      <button (click)="getData()">Click here</button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  getData() {

    // Create simple observable that emits three values

    const myObservable = of(1.2.3);



    // Create observer object

    const myObserver = {

      nextx= > console.log('Observer got a next value: ' + x),

      errorerr= > console.error('Observer got an error: ' + err),

      complete(a)= > console.log('Observer got a complete notification'),

    };



    // Execute with the observer object

    myObservable.subscribe(myObserver);

  }



}

Copy the code

Start the project, open the page, and click the button.

from()

import { Component, OnInit } from '@angular/core';

import { Observable,from } from 'rxjs';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Create an observable using the from function </h2>

    <div>

<button (click)="fromData()"> </button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  fromData() {

    let persons = [

      { name'Dave'.age34.salary2000 },

      { name'Nick'.age37.salary32000 },

      { name'Howie'.age40.salary26000 },

      { name'Brian'.age40.salary30000 },

      { name'Kevin'.age47.salary24000 },

    ];



    const myObservable = from(persons);

    myObservable.subscribe(person= > console.log(person));

  }



}

Copy the code

Page test results are:

fromEvent()

import { Component, OnInit } from '@angular/core';

import { Observable,fromEvent } from 'rxjs';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Create an observable using the fromEvent function </h2>

    <div>

      <p id="content">Hello Hresh</p>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

      this.fromEvent();

  }



  fromEvent() {

    const el = document.getElementById('content');



    const mouseMoves = fromEvent(el, 'click');



    const subscription = mouseMoves.subscribe((a)= > {

      el.style.color = 'red';

    });

  }

}

Copy the code

Run the project, click on the text, and the text will turn red.

There is a typical application of fromEvent in real production: type-ahead suggestions.

Observables simplify the implementation of input suggestions. Typical input prompts to complete a series of separate tasks:

  • Listen for data from input.
  • Remove whitespace before and after the input value and make sure it reaches the minimum length.
  • Anti-shake (this prevents API requests from being made each time the key is pressed continuously, and should be made only when the key pauses)
  • If the input value does not change, do not initiate a request (such as pressing a character and then quickly pressing backspace).
  • If the result of an AJAX request that has been made becomes invalid because of subsequent modifications, cancel it.

Implementing this functionality entirely in traditional JavaScript may require a lot of work. With observables, you can use a simple sequence of RxJS operators like this:

import { fromEvent } from 'rxjs';

import { ajax } from 'rxjs/ajax';

import { debounceTime, distinctUntilChanged, filter, map, switchMap } from 'rxjs/operators';





const searchBox = document.getElementById('search-box');



const typeahead = fromEvent(searchBox, 'input').pipe(

  map((e: KeyboardEvent) = > (e.target as HTMLInputElement).value),

  filter(text= > text.length > 2),        // Check whether the input length is greater than 2

  debounceTime(500),    // Wait until the user stops typing (in this case, 1/2 second)

  distinctUntilChanged(),    // Wait for the search text to change.

  switchMap((a)= > ajax('/api/endpoint'))        // Send the search request to the service.

);



typeahead.subscribe(data= > {

 // Handle the data from the API

});

Copy the code

fromPromise()

import { Component, OnInit } from '@angular/core';

import { from, Observable } from 'rxjs';

import { fromPromise } from 'rxjs/internal/observable/fromPromise';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Create an observable using the from function </h2>

    <div>

<button (click)="fromPromiseData()"> Create Observable</button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  fromPromiseData() {

    const myObservable = fromPromise(new Promise((resolve, reject) = > {

      setTimeout((a)= > {

        // tslint:disable-next-line:prefer-const

        let username = 'hresh----Promise';

        resolve(username);

      }, 2000);

    }));

    myObservable.subscribe({

      next(data) { console.log(data); },

      error(err) { console.error('Error' + err); },

      complete() { console.log('completed'); }

    });

  }



}

Copy the code

After Rxjs6, from can be used instead of fromPromise, so changing from() is ok. Page test results are:

To extend this, the official documentation has a case where the fetch method returns a Promise object. I have made some changes here:

import { Component, OnInit } from '@angular/core';

import { from, Observable } from 'rxjs';

import { fromPromise } from 'rxjs/internal/observable/fromPromise';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Create an observable using the from function </h2>

    <div>

<button (click)="fromPromiseData()"> Create Observable</button>

      <br>

<button (click)="fromData2()"> </button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  fromPromiseData() {

    const myObservable = from(fetch('http://a.itying.com/api/productlist'));

    myObservable.subscribe({

      next(data) { console.log(data); },

      error(err) { console.error('Error' + err); },

      complete() { console.log('completed'); }

    });

  }



  fromData2() {

    // Fetch () returns a Promise that fetches the contents of the Promise object, the array

    let arrayData = [];

    fetch('http://a.itying.com/api/productlist').then(response= > response.json()).then((data= > {

      arrayData = data.result;

    }));



    let myObservable = null;

    setTimeout((a)= > {

      myObservable = from(arrayData);

      myObservable.subscribe(data= > console.log(data));

    }, 2000);

  }



}

Copy the code

Page test results are:

interval()

import { Component, OnInit } from '@angular/core';

import { interval, Observable } from 'rxjs';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Create an observable using the interval function </h2>

    <div>

<button (click)=" Interval2 ()"> </button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  interval2() {

    const secondsCounter = interval(1000);

    // Subscribe to begin publishing values

    const oData = secondsCounter.subscribe(n= >

      console.log(`It's been ${n}seconds since subscribing! `));

    setTimeout((a)= > {

      console.log('Cancel count operation');

      oData.unsubscribe();  /* Cancel data display after 5s */

    }, 5000);

  }



}

Copy the code

The interval operator supports a numeric parameter that represents the timing interval. The code above shows that every 1s, an increasing value is printed, starting at 0. Page test results are:

ajax()

import { Component, OnInit } from '@angular/core';

import {ajax} from 'rxjs/ajax';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Create observables using Ajax functions </h2>

    <div>

<button (click)=" Ajax2 ()"> Create Observable</button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  ajax2(){

    const apiData = ajax('http://a.itying.com/api/productlist');

    // Subscribe to create the request

    apiData.subscribe(res= > console.log(res.status, res.response));

  }



}

Copy the code

Page test results:

Operator (Operators)

The method we describe for creating an Observable is the Rxjs operator, which is a set of Observable functions that perform complex operations on a collection. RxJS also defines operators such as map(), filter(), concat(), and flatMap().

The operator takes configuration items and returns a function that takes the source observable. When the returned function is executed, the operator looks at the values emitted from the source observable, converts them, and returns a new observable consisting of the converted values.

map

import { map } from 'rxjs/operators';



const nums = of(1.2.3);



const squareValues = map((val: number) = > val * val);

const squaredNums = squareValues(nums);



squaredNums.subscribe(x= > console.log(x));



// Logs

/ / 1

/ / 4

/ / 9

Copy the code

As you can see, the map takes a function that squares each element for conversion.

filter

You can use pipes to link these operators together. Pipes allow you to combine multiple functions returned by operators into one. The pipe() function takes the functions you want to combine and returns a new function that, when executed, executes the combined functions sequentially.

A set of operators applied to an observable is like a processing flow — that is, a set of steps that process the values you are interested in. The process itself doesn’t do anything. You need to call SUBSCRIBE () to generate a result through the process.

import { filter, map } from 'rxjs/operators';



const nums = of(1.2.3.4.5);



// Create a function that accepts an Observable.

const squareOddVals = pipe(

  filter((n: number) = > n % 2! = =0),

  map(n= > n * n)

);



// Create an Observable that will run the filter and map functions

const squareOdd = squareOddVals(nums);



// Subscribe to run the combined functions

squareOdd.subscribe(x= > console.log(x));



// Logs

/ / 1

/ / 9

/ / 25

Copy the code

The pipe() function is also an Observable method in RxJS, so you can do the same thing with the following abbreviations:

import { filter, map } from 'rxjs/operators';



const squareOdd = of(1.2.3.4.5)

  .pipe(

    filter(n= > n % 2! = =0),

    map(n= > n * n)

  );



// Subscribe to get values

squareOdd.subscribe(x= > console.log(x));

Copy the code

share

Before we get to the Share operator, we need to look at Observables in hot and cold mode. The official definition is as follows:

Cold Observables run after being subscribed, that is, the Observables sequence pushes data only after the SUBSCRIBE function is called. Unlike Hot Observables, they start generating data before they are subscribed, such as mouse Move events.

Here’s how it works: Cold creates a new data producer internally, while Hot always uses an external data producer.

Here’s an example:

Cold: It’s like I’m watching league of Legends reruns at Station B, from the beginning.

Hot: It’s like watching league of Legends live. If you’re late, you can’t see anything in front of you.

The share() operator allows multiple subscribers to share the same Observable. That’s turning Cold into Hot. See the following example:

import { Component, OnInit } from '@angular/core';

import { interval } from 'rxjs';

import { share, take } from 'rxjs/operators';



@Component({

  selector'app-observable'.

  templateUrl'

<h2> Share operator </h2>

    <div>

      <button (click)="shareData()">Click</button>

    </div>

  '


})

export class ObservableComponent implements OnInit {

  constructor() {}



  ngOnInit(): void {

  }



  shareData() {

    const numbers = interval(1000).pipe(

      take(5),

      share()

    );



    function subscribeToNumbers(name{

      numbers.subscribe(

        x= > console.log(`${name}${x}`)

      );

    }



    subscribeToNumbers('Dave');



    const anotherSubscription = (a)= > subscribeToNumbers('Nick');



    setTimeout(anotherSubscription, 2500);

  }



}

Copy the code

Page test results are:

Take (5) takes 5 numbers (1,2,3,4,5). Then share() changes the Observable from cold to hot. Later Dave subscribed, and 2.5 seconds later Nick subscribed.

other

RxJS provides many operators, but only a few are commonly used. As shown below:

The use of these Operators is explained in detail in the third RxJS series, Operators, and the RxJS API documentation.

Error handling

In addition to providing an error() handler at subscription time, RxJS also provides the catchError operator, which allows you to handle known errors in a pipe.

Suppose you have an observable that makes an API request and then maps the response returned by the server. If the server returns an error or the value does not exist, an error is generated. If you catch this error and provide a default value, the stream will continue to process these values without an error.

import { ajax } from 'rxjs/ajax';

import { map, catchError } from 'rxjs/operators';

// Return "response" from the API. If an error happens,

// return an empty array.

const apiData = ajax('http://a.itying.com/api/productlist').pipe(

  map(res= > {

    if(! res.response) {

      throw new Error('Value expected! ');

    }

    return res.response;

  }),

  catchError(err= > of([]))

);



apiData.subscribe({

  next(x) { console.log('data: ', x); },

  error(err) { console.log('errors already caught... will not run'); }

});

Copy the code

Page test results:

Retry failed observables

CatchError provides an easy way to recover, and the Retry operator lets you try a failed request.

You can use the retry operator before catchError. It subscribes to the original source observable, and it can rerunce the sequence of actions that caused the resulting error. If it contains an HTTP request, it will re-initiate that HTTP request.

The following code changes the previous example to resend the request before catching an error:

import { ajax } from 'rxjs/ajax';

import { map, retry, catchError } from 'rxjs/operators';



const apiData = ajax('/api/data').pipe(

  retry(3), // Retry up to 3 times before failing

  map(res= > {

    if(! res.response) {

      throw new Error('Value expected! ');

    }

    return res.response;

  }),

  catchError(err= > of([]))

);



apiData.subscribe({

  next(x) { console.log('data: ', x); },

  error(err) { console.log('errors already caught... will not run'); }

});

Copy the code

Note: Do not retry login authentication requests; these requests should only be triggered by user actions. We certainly don’t want to lock a user’s account by automatically repeating login requests.

The problem record

fromPromise no longer exported in master (v6)

RxJS version: 6.0.0 – alpha. 3

Code to reproduce:

import { fromPromise } from 'rxjs';

Copy the code

Expected behavior:

should work just as it did with v5.5 (but different location)

Solution: Use from instead of fromPromise.

reference

An introduction to Angular6 version upgrades and new features in RXJS6

RxJS: Getting started

Observable RxJS series 2 – Observable

RxJS series 3 – Operators details

RxJS Crash (part 1)