reference

Observable. subscribe has three objects next, Error, and complete

observable.subscribe({
  next: value= > console.log(`Value is ${value}`),
  error: err= > console.log(err),
  complete: () = > console.log(`Completed`})),Copy the code

The.subscribe function can also have three functions instead of one object.

observable.subscribe(
  value= > console.log(`Value is ${value}`),
  err= > console.log(err),
  () = > console.log(`Completed`))Copy the code

We can create a new Observable by creating an object, passing in a subscriber (also called observer) function. The user has three methods: Next, Error, and Complete. The subscriber can call the next one with a value as many times as needed, complete or error at the end. Call complete or post error, and the observable will not push any values down.

import { Observable } from 'rxjs'
const observable$ = new Observable(function subscribe(subscriber) {
  const intervalId = setInterval(() = > {
    subscriber.next('hi');
    subscriber.complete()
    clearInterval(intervalId);
  }, 1000);
});
observable$.subscribe(
  value= > console.log(`Value is ${value}`),
  err= > console.log(err)
)
Copy the code

The above example prints Value is HI after 1000 milliseconds.

RxJS has many features for creating observable objects. Some of the most common ones are of, from, and Ajax

“Of” takes a series of values and converts them to a stream

of(1.2.3.'Hello'.'World').subscribe(value= > console.log(value))
// 1 2 3 Hello World
Copy the code

From converts everything to a stream

import { from } from 'rxjs'
from([1.2.3]).subscribe(console.log)
/ / 1 2 3
from(new Promise.resolve('Hello World')).subscribe(console.log)
// 'Hello World'
from(fibonacciGenerator).subscribe(console.log)
// 1 1 2 3 5 8 13 21...
Copy the code

Ajax takes a string URL or creates an observable that makes an HTTP request. Ajax has a function ajax.getjson that returns only nested response objects from ajax calls, without any other properties returned by Ajax () :

ajax('https://jsonplaceholder.typicode.com/todos/1').subscribe(console.log)
// {request, response: {userId, id, title, completed}, responseType, status}
ajax.getJSON('https://jsonplaceholder.typicode.com/todos/1').subscribe(console.log)
// {userId, id, title, completed}
ajax({ url, method, headers, body }).subscribe(console.log)
/ / {... }
Copy the code

Operators

Operators are the real power of RxJS, which has Operators for almost everything you need. As of RxJS 6, operators are not methods on observable objects, but pure functions applied to observable objects using the.pipe method.

Map takes a single parameter function and applies a projection to each element in the flow:

of(1.2.3.4.5).pipe(
  map(i= > i * 2)
).subscribe(console.log)
// 2, 4, 6, 8, 10
Copy the code

Filter takes an argument and removes from the stream the value that returns false for the given function

of(1.2.3.4.5).pipe(
  map(i= > i * i),
  filter(i= > i % 2= = =0)
).subscribe(console.log)
/ / 4, 16

Copy the code

FlatMap takes a function that maps each item in a stream (Steam) to another stream and flattens out all the values of those streams:

of(1.2.3).pipe(
  flatMap(page= > ajax.toJSON(`https://example.com/blog?size=2&page=${page}`)),
).subscribe(console.log)
// [ { blog 1 }, { blog 2 }, { blog 3 }, { blog 4 }, { blog 5 }, { blog 6 } ]
Copy the code

Merge Merges items in two streams in the order they arrive

merge(
  interval(150).pipe(take(5), mapTo('A')),
  interval(250).pipe(take(5), mapTo('B'))
).subscribe(console.log)
// A B A A B A A B B B
Copy the code

Other operators

redux-observable

All operations in REdux are synchronous. A Redux-Observable is redux’s middleware that performs asynchronous work using observable flows and dispatches another action in redux using the results of that asynchronous work.

A Redux-Observable is based on the idea of Epics. An Epic is a function that accepts a series of actions, optionally generates a stream of state, and returns a series of actions.

function (action$:Observable,state$:StateObservable):Observable;

By convention, each variable that is steam ends with **$**.

Here is a pong Epic example: carry a ping request service, and when the request is complete, receive a Pong into the app.

const pingEpic =action$=>action$.pipe(
  ofType('PING'),
  flatMap(action= >ajax('https://example.com/pinger')),
  mapTo({type:'PONG'}))Copy the code

Now we will update the original Todo Store by adding EPics and retrieving users.

import { combineReducers, createStore } from 'redux'
import { ofType, combineEpics, createEpicMiddleware } from 'redux-observable';
import { map, flatMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'
/ /...
/* User and Todos reducers are defined as above */
const rootReducer = combineReducers({ user, todos }) 
const epicMiddleware = createEpicMiddleware();
const userEpic=action$=>action$.pipe(
  ofType('GET_USER'),
  flatMap(() = >ajax.getJSON('https://foo.bar.com/get-user')),
  map(user= >({type:'GET_USER_SUCCESS'.payload:user}))
 )
const addTodoEpic=action$=>action$.pipe(
  ofType('ADD_TODO'),
  flatMap(() = >ajax({
    url:'https://foo.bar.com/add-todo'.method:'POST'.body: {text:action.payload}
  })),
  map(data= >data.response),
  map(todo= >({type:'ADD_TODO_SUCCESS'.payload:todo}))
 )
const completeTodoEpic=action$=>action$.pipe(
  ofType('COMPLETE_TODO'),
  flatMap(() = >ajax({
    url:'https://foo.bar.com/add-todo'.method:'POST'.body: {id:action.payload}
  })),
  map(data= >data.response),
  map(todo= >({type:'COMPLETE_TODO_SUCCESS'.payload:todo}))
 )
const rootEpic = combinEpic (userEpic,addTodoEpic,completeTodoEpic)
const store = createStore(rootReducer,applyMiddleware(epicMiddleware))
epicMiddleware.run(rootEpic);
Copy the code

Note: Epic, like any Observable Streams in RxJS, will end up in a complete or error state, and then Epic and app will stop working. So you can use catchError to catch potential errors in the reactive TODO application – Github source code