reference
Observable. subscribe has three objects next, Error, and complete
observable.subscribe({
next: value= > console.log(`Value is ${value}`),
error: err= > console.log(err),
complete: () = > console.log(`Completed`})),Copy the code
The.subscribe function can also have three functions instead of one object.
observable.subscribe(
value= > console.log(`Value is ${value}`),
err= > console.log(err),
() = > console.log(`Completed`))Copy the code
We can create a new Observable by creating an object, passing in a subscriber (also called observer) function. The user has three methods: Next, Error, and Complete. The subscriber can call the next one with a value as many times as needed, complete or error at the end. Call complete or post error, and the observable will not push any values down.
import { Observable } from 'rxjs'
const observable$ = new Observable(function subscribe(subscriber) {
const intervalId = setInterval(() = > {
subscriber.next('hi');
subscriber.complete()
clearInterval(intervalId);
}, 1000);
});
observable$.subscribe(
value= > console.log(`Value is ${value}`),
err= > console.log(err)
)
Copy the code
The above example prints Value is HI after 1000 milliseconds.
RxJS has many features for creating observable objects. Some of the most common ones are of, from, and Ajax
“Of” takes a series of values and converts them to a stream
of(1.2.3.'Hello'.'World').subscribe(value= > console.log(value))
// 1 2 3 Hello World
Copy the code
From converts everything to a stream
import { from } from 'rxjs'
from([1.2.3]).subscribe(console.log)
/ / 1 2 3
from(new Promise.resolve('Hello World')).subscribe(console.log)
// 'Hello World'
from(fibonacciGenerator).subscribe(console.log)
// 1 1 2 3 5 8 13 21...
Copy the code
Ajax takes a string URL or creates an observable that makes an HTTP request. Ajax has a function ajax.getjson that returns only nested response objects from ajax calls, without any other properties returned by Ajax () :
ajax('https://jsonplaceholder.typicode.com/todos/1').subscribe(console.log)
// {request, response: {userId, id, title, completed}, responseType, status}
ajax.getJSON('https://jsonplaceholder.typicode.com/todos/1').subscribe(console.log)
// {userId, id, title, completed}
ajax({ url, method, headers, body }).subscribe(console.log)
/ / {... }
Copy the code
Operators
Operators are the real power of RxJS, which has Operators for almost everything you need. As of RxJS 6, operators are not methods on observable objects, but pure functions applied to observable objects using the.pipe method.
Map takes a single parameter function and applies a projection to each element in the flow:
of(1.2.3.4.5).pipe(
map(i= > i * 2)
).subscribe(console.log)
// 2, 4, 6, 8, 10
Copy the code
Filter takes an argument and removes from the stream the value that returns false for the given function
of(1.2.3.4.5).pipe(
map(i= > i * i),
filter(i= > i % 2= = =0)
).subscribe(console.log)
/ / 4, 16
Copy the code
FlatMap takes a function that maps each item in a stream (Steam) to another stream and flattens out all the values of those streams:
of(1.2.3).pipe(
flatMap(page= > ajax.toJSON(`https://example.com/blog?size=2&page=${page}`)),
).subscribe(console.log)
// [ { blog 1 }, { blog 2 }, { blog 3 }, { blog 4 }, { blog 5 }, { blog 6 } ]
Copy the code
Merge Merges items in two streams in the order they arrive
merge(
interval(150).pipe(take(5), mapTo('A')),
interval(250).pipe(take(5), mapTo('B'))
).subscribe(console.log)
// A B A A B A A B B B
Copy the code
Other operators
redux-observable
All operations in REdux are synchronous. A Redux-Observable is redux’s middleware that performs asynchronous work using observable flows and dispatches another action in redux using the results of that asynchronous work.
A Redux-Observable is based on the idea of Epics. An Epic is a function that accepts a series of actions, optionally generates a stream of state, and returns a series of actions.
function (action$:Observable,state$:StateObservable):Observable;
By convention, each variable that is steam ends with **$**.
Here is a pong Epic example: carry a ping request service, and when the request is complete, receive a Pong into the app.
const pingEpic =action$=>action$.pipe(
ofType('PING'),
flatMap(action= >ajax('https://example.com/pinger')),
mapTo({type:'PONG'}))Copy the code
Now we will update the original Todo Store by adding EPics and retrieving users.
import { combineReducers, createStore } from 'redux'
import { ofType, combineEpics, createEpicMiddleware } from 'redux-observable';
import { map, flatMap } from 'rxjs/operators'
import { ajax } from 'rxjs/ajax'
/ /...
/* User and Todos reducers are defined as above */
const rootReducer = combineReducers({ user, todos })
const epicMiddleware = createEpicMiddleware();
const userEpic=action$=>action$.pipe(
ofType('GET_USER'),
flatMap(() = >ajax.getJSON('https://foo.bar.com/get-user')),
map(user= >({type:'GET_USER_SUCCESS'.payload:user}))
)
const addTodoEpic=action$=>action$.pipe(
ofType('ADD_TODO'),
flatMap(() = >ajax({
url:'https://foo.bar.com/add-todo'.method:'POST'.body: {text:action.payload}
})),
map(data= >data.response),
map(todo= >({type:'ADD_TODO_SUCCESS'.payload:todo}))
)
const completeTodoEpic=action$=>action$.pipe(
ofType('COMPLETE_TODO'),
flatMap(() = >ajax({
url:'https://foo.bar.com/add-todo'.method:'POST'.body: {id:action.payload}
})),
map(data= >data.response),
map(todo= >({type:'COMPLETE_TODO_SUCCESS'.payload:todo}))
)
const rootEpic = combinEpic (userEpic,addTodoEpic,completeTodoEpic)
const store = createStore(rootReducer,applyMiddleware(epicMiddleware))
epicMiddleware.run(rootEpic);
Copy the code
Note: Epic, like any Observable Streams in RxJS, will end up in a complete or error state, and then Epic and app will stop working. So you can use catchError to catch potential errors in the reactive TODO application – Github source code