Akik: Did Today eat enough

Article author authorizes this account to publish, do not reprint without permission

ReactiveX

An API for asynchronous programming with observable streams

lodash for async

Asynchronous events

RxJS is a library for handling asynchronous events. Where do they come from?

Common Async Events

  • User interaction (mouse, keyboard, page scrolling…)
  • Network request (Http, WebSocket)
  • Timers (setTimeout, setInterval)
  • Animation (TransitionEnd, AnimationEnd…)
  • Others (Workers, iframe…)

React handles events like this:

handleClick = (event) = > {
    // To do something ...
    this.setState(...)
}

<Button onClick={handleClick}>Click</Button>
Copy the code

There are certain scenarios where asynchronous events need to be controlled, such as:

  • Event stabilization, throttling, caching…
  • Cancellation and composition of AJAX requests
  • Handling Drag and Drop events
  • Handle WebSocket, Workers, iframe communication, etc

How asynchronous events are handled

The most common method is to use the Callback function, where you do some judgment logic

fetchSomeData((error, data) = > { 
    if(! error) { dispatch({type:'HERES_THE_DATA', data}); }});Copy the code

However, too many asynchronous call layers can result in Callback Hell

The use of Promise can solve this problem well, the use of chain call code structure flat, clear logic

Characteristics of Promise

  • Guaranteed results: An execution result (success, failure) is guaranteed in the future
  • Irreversible: The system can be executed immediately after creation and cannot be cancelled. The system cannot be changed after creation
  • Only one value: After completion, only one value is returned, regardless of the number of then calls

Think about when Promise is used in a project.

In most cases there is probably only one answer: make an HTTP request

Because only the HTTP request is one-time among these asynchronous events, and the rest are repeated many times, Promise is only good for handling a subset of asynchronous events, most of which we handle with Callback.

In addition, HTTP requests may need to be cancelled in some cases:

  • Switch routes and pages
  • AutoComplete search
  • User request to cancel

These are common and easily overlooked scenarios.

Some concepts in RxJS

Observable Observable

To address the problems that Promise faced, RxJS introduced the concept of Observable

  • An Observable is a stream of events: a time-dimensional collection of zero, one, or more values
  • An Observable handles events at any time
  • Observables can be cancelled

From the perspective of time dimension, it is an event that happens successively at different times. For example, clicking the screen in sequence can be viewed as a streaming event

An Observable is a temporal collection of events.

Create an Observable

RxJS provides several ways to create observables

import { of.from, interval, ajax, webSocket, fromEvent } from 'rxjs';

of('hello') // Create from a value
from([1.2.3.4]) // Create from a series of values
interval(1000) // The timer emits one event per second
ajax('http://example.com') // Created from an Ajax request
webSocket('ws://echo.websocket.com') // create from websocket
fromEvent(button, 'click') // Created from a DOM event
// many more...
Copy the code

Observe Observable changes

Observe the stream of events using the Subscribe method of Observable

myObservable.subscribe(
    value= > console.log('next', value), // Normal event
    err= > console.error('error', err), // Exception event, optional
    () = > console.info('complete! ') // Finish the event, optional
);
Copy the code

The Subscribe method defines an observer with three methods: Next, Error, and complete, which together make up an observer. The Next method listens for events in the normal event stream, the error method handles abnormal events in the event stream, and the complete method handles completion events in the event stream. When the event stream is completed, all events are finished.

Process the stream of events

The data stream can be processed using pipes between Observables and Subscribe, which put the required operators into the pipe.

Operators Are pure functions that use the functional programming style

import { interval } from 'rxjs';
import { map, filter, scan } from 'rxjs/operators';
   
interval(1000)
 .pipe(
   filter(x= > x % 2= = =0),
   map(x= > x + x),
   scan((acc, x) = > acc + x)
 )
 .subscribe(x= > console.log(x))
Copy the code

Operators can transform Observable:

  1. Conversion: Map, Filter, reduce
  2. Can be combined with each other: concat, merge, zip
  3. Process event streams in the temporal dimension: Debounce, Throttle, Buffer, combineLatest
  4. Lazy: retry, repeat

Learn more about the use of Operators

Subject = Subject

In practice, we often need to customize when the implementation needs to emit an event. In this case, we need an Observable that can manually trigger the event. The Subject itself is an Observable, which exposes the Next method for external calls to generate an event. Sending a message of the same type through a topic is equivalent to EventEmitter. And is the only way to multiroute a value or event to multiple observers.

import { Subject } from 'rxjs';

const someSubject = new Subject();

someSubject.subscribe(value= >{
    console.log(value);
});

someSubject.next('hello');
// output 'hello'
Copy the code

Consider: What changes after an asynchronous event occurs?

Use RxJS for state management

All states change with asynchronous events, and processing of asynchronous events is ultimately about changing state

Within RxJS you can use BehaviorSubject, which has a concept of “current value”. It holds the most recent value sent to the observer. And when a new observer subscrires, the BehaviorSubject immediately receives a “current value” from the BehaviorSubject. When a BehaviorSubject is updated, each of the subscribed components is re-rendered with the new value.

use

1. Event transfer between components

The sample link

import { BehaviorSubject } from 'rxjs';

// Controls the BehaviorSubject of show and hide. The parameters are in the initial state
const visibleSubject$ = new BehaviorSubject(false);

function Show(){
  return <button onClick={()= >visibleSubject$.next(true)}>show</button>;
}

function Hide(){
  return <button onClick={()= >visibleSubject$.next(false)}>hide</button>;
}

function Tips(){
  const [visible, setVisible] = useState(false);
  useEffect(() = > {
    // The subscription status changes
    const subscription = visibleSubject$.subscribe(setVisible);
    // Unsubscribe when the component is destroyed
    return () = >subscription.unsubscribe(); } []);return <div> {visible && 'Tips'} </div>;
}
Copy the code

2. Hooks

Isolate the generic parts to define a hooks example link

function useObservable(observable$){
  const [state, setState] = useState();
  useEffect(() = > {
    const subscription = observable$.subscribe(setState);
    return () = >subscription.unsubscribe(); } []);return state;
}
Copy the code

Tips components can be simpler:

function Tips(){
  const visible = useObservable(visibleSubject$);
  return <div> {visible && 'Tips'} </div>;
}
Copy the code

3. Search, filter, and table linkage

The sample link

This is a very common scenario where we can think of the search box and filter as two different event sources that work together to change the table state. Here are some details to consider:

  • An HTTP request is required when any value changes
  • The input box needs to be shaken
  • Cancel outstanding requests before making the next one
  • Setting the Loading state

CombineLatest: Combine multiple Observables into one Observable. The value of the new Observable contains the latest value of each input Observable.

Use a Hook when using Subject in a functional component, or if you use Subject directly in a Class component

function useSubject(fn, initalValue){
  const subject$ = useRef(new BehaviorSubject(initalValue)).current;
  constobservable$ = useRef(fn? fn(subject$):subject$).current;const onNext = useCallback((value) = > subject$.next(value), [fn]);
  return [onNext, observable$];
}
Copy the code

The complete code is as follows

function SearchList(){
  const [laoding, setLoading] = useState(false);
  // Search for events
  const [onSearch, search$] = useSubject((event$) = >event$.pipe(
    map(event= >event? .target? .value),// Take the value of event
  ),' ');
  // Filter events
  const [onSelect, select$] = useSubject(null.1);
  // Use combineLatest to combine two events into a single event stream
  const data = useObservable(() = >combineLatest([search$,select$]).pipe(
    debounceTime(300), / / image stabilization
    tap(() = >setLoading(true)), // Set the loading state
    // Initiate a request
    switchMap(([text,times]) = >from(getSearchResult(text, times)).pipe(
      catchError(error= >of([]))
    )),
    tap(() = >setLoading(false))), []);return <div>
    <Input onChange={onSearch}/>
    <Select onSelect={onSelect}>
      <Select.Option value={1}>1</Select.Option>
      <Select.Option value={2}>2</Select.Option>
      <Select.Option value={3}>3</Select.Option>
    </Select>
    <Spin spinning={laoding}>
      <List
        bordered
        dataSource={data}
        renderItem={item= > <List.Item>{item}</List.Item>} / ></Spin>
  </div>;
}
Copy the code

Tips: Encapsulated rxJs-hooks can also be used

RxJS pipe principle

After v6.0, RXJS started using Pipe instead of chaining the operator for Tree Shaking

import { interval } from 'rxjs';
import { map, filter, scan } from 'rxjs/operators';
   
interval(1000)
 .pipe(
   filter(x= > x % 2= = =0),
   map(x= > x + x),
   scan((acc, x) = > acc + x)
 )
 .subscribe(x= > console.log(x))
Copy the code

Pipe is a method of Observable, as defined in the source code:

class Observable<T> implements Subscribable<T> {

  ...

  pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
    return operations.length ? pipeFromArray(operations)(this) : this; }... }Copy the code

The PIPE method takes the argument list as a collection of operators and returns an Observable, so an Observable can still call the pipe function after pipe processing.

Pipe returns the current object if the pipe argument list is empty, and executes pipeFromArray if the pipe argument list is not empty. PipeFromArray looks like this:

function pipeFromArray<T.R> (fns: Array<UnaryFunction<T, R>>)
    : UnaryFunction<T.R> {
  if (fns.length === 0) { // returns an operator function that does nothing when FNS is null
    return (x= > x) as UnaryFunction<any, any>;
  }
  if (fns.length === 1) {
    return fns[0]; // Return the operator if there is only one
  }
  return function piped(input: T) :R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) = > fn(prev), 
                      input as any);
};
Copy the code

Returns the piped function when there are more than one operator.

  • parameterinputforreduceThe initial value of the function, in this case the callpipeThe object of observables
pipeFromArray(operations)(this) // This points to an Observable
Copy the code
  • Operations inreduceFunction is executed in the order in which they are in the array
  • Current operation The execution result of the previous operation is returned as parameterfn(prev)The value after execution is passed to the next operator

Is the function in the operator executed when the pipe is called?

Take the map operator as an example:

export function map<T.R> (project: (value: T, index: number) => R, thisArg? : any) :OperatorFunction<T.R> {
  return operate((source, subscriber) = > {
    // Index of the source value
    let index = 0;
    // Subscribe to the event source to send all events and errors to the next consumer
    source.subscribe(
      new OperatorSubscriber(subscriber, (value: T) = > {
        // value is the event value that calls the function in the operator, passing the result of execution to the next consumersubscriber.next(project.call(thisArg, value, index++)); })); }); }Copy the code

The operator returns a Operate function that connects the event source to the next consumer by subscribe, but no function in the operator is executed. The pipe connects the source to the observer, and the pipe finally passes the event to the observer by executing the operator functions in turn when the source calls the observer’s next method.

Summary: A pipe connection is established when a SUBSCRIBE is called. Pipes that are not subscribed have no effect, and the operator only processes the event when it occurs

conclusion

What benefits can RxJS bring?

  • RxJS is perfect for handling complex user events and asynchronous queries, such as handling form linkage logic, using RxJS Pipes, where it’s easy to write logically clear code.
  • You can use RxJS to handle and limit user events to update the state of your application.
  • React is a great place to communicate between components
  • It is easy to create and manipulate data streams, and there are many data stream operators and ways to create data streams, so there is a lot of scope (e.g., Websocket).
  • RxJS is recommended for scenarios involving the concept of time, or when you need to deal with historical values of events (not just the latest ones), e.g. : can be used insteadcomponentWillReceiveProps

What are the problems with using RxJS?

  • The learning curve is steep and it takes a long time for newcomers to get started
  • For some projects without complex operating scenarios, RxJS may not be necessary
  • Too many operators can be troublesome in debugging

Q&A

1. What about Redux and MobX?

Redux, MobX, and RxJS are all capable of state management, and RxJS is better at processing asynchronous events. Usually, asynchronous events are accompanied by state changes. They do not conflict and can be used together

2. Is this a new wheel?

Rx is a mature library. If you haven’t worked with Rx before, this is a new programming idea, and it has corresponding implementations in many languages. See some of the differences between Apple’s Reactive framework Combain, released on WWDC19, which brings responsiveness to SwiftUI.

RxSwift to Apple’s Combine “Cheat Sheet”

Refer to the link

  1. ReactiveX
  2. The difference between the observer model and the subscription publishing model
  3. Combine vs RxSwift: Introduction to Combine and Differences
  4. ReactiveX usage
  5. reactive.how
  6. A Primer on Responsive Programming – Easy to understand RxJS
  7. React and Rx.js – The Power Of Observable