This is the second article in the RxJS + Redux App State Management series, which introduces the design philosophy and implementation of Redux-Observable. Back to Article 1: Component autonomy using Redux-Observable

Summary of article addresses in this series:

  • Use redux-Observable to implement component autonomy
  • How to implement a Redux-Observable

Redux

Redux is derived from the Elm architecture, and its state management perspective and process are very clear and unambiguous:

  1. Dispatch an action
  2. Actions were captured by the Reducer and different state update logic was made according to the action type
  3. Do this process over and over again

This process is synchronous and Redux does not recommend dealing with side effects (such as HTTP requests) in the Reducer in order to protect the purity of the reducer. As a result, redux-Thunk, Redux-Saga and other redux middleware have emerged to handle side effects.

The essence of these middleware is to capture the contents of dispatch and deal with side effects in this process, and finally dispatch a new action to reducer so that reducer can concentrate on making a pure state machine.

Manage side effects with Observables

Assume that we can send a FETCH action of data pull at the UI layer. After data pull, the FETCH_SUCCESS action or the FETCH_ERROR action of data pull failure will be sent to reducer.

           FETCH
             |
       fetching data...
             |
            / \
           /   \
 FETCH_SUCCESS FETCH_ERROR
Copy the code

If we think about this process in FRP mode, the FETCH is not an independent entity, but exists on a stream (Observable) that sends the FETCH action:

---- FETCH ---- FETCH ---- 

---- FETCH_SUCCESS ---- FETCH_SUCCESS ----

---- FETCH_ERROR ---- FETCH_ERROR ----
Copy the code

If we define the FETCH stream as FETCH $, then both FETCH_SUCCESS and FETCH_ERROR will come from FETCH $:

const fetch$: Observable<FetchAction> = //....
fetch$.pipe(
  switchMap((a)= > from(api.fetch).pipe(
    // Data was pulled successfully
    switchMap(resp= > ({
      type: FETCH_SUCCESS,
      payload: {
        // ...}}),// Failed to pull data
    catchError(error= > of({
      type: FETCH_ERROR,
      payload: {
        / /...}})))))Copy the code

In addition, we can use a stream to host all the actions on the page:

const action$: Observable<Action>
Copy the code

Fetch $can also be fetched from the action$stream:

const fetch$ = action$.pipe(
  filter(({type}) = > type === FETCH)
)
Copy the code

So we have a pattern of observable action:

Next, we try to integrate this pattern into Redux, allowing an Observable to handle the application’s action flow and side effects.

Building middleware

The middleware mechanism provided by Redux allows us to intervene with each incoming action, handle some business logic, and then return an action to the Reducer:

The function composition of middleware is as follows:

const middleware: Middleware = store= > {
  // Initialize the middleware
  return next= > action => { 
  	// do something}}const store = createStore(
  rootReducer,
  applyMiddleware(middleware)
)
Copy the code

Now, when the middleware initializes, we do action$. When a new action arrives:

  1. Turn the actions over to the Reducer
  2. Want toaction$Put the action
  3. action$You can transform another action stream

Thus, action$is both an observer and an observable, and is a Subject object:

const createMiddleware = (): Middleware= > {
  const action$ = new Subject()
  const middleware: Middleware = store= > next => action= > {
    // Send the actions to reducer
    const result = next(action)
    // Place the action in action$for circulation
    action$.next(action)
    return result
  }
  return middleware
}
Copy the code

Stream converter

Now, in the middleware, we initialize action$, but how do we fetch$streams derived from action$? Therefore, we also need to tell the middleware that if more streams are generated with action$, we can define a converter that takes care of the flow of action$and handles side effects in it:

interface Transformer {
  (action$: Observable<Action>): Observable<Action>
}

const fetchTransformer: Transformer = (action$) = > {
  action$.pipe(
    filter(({type}) = > type === FETCH),
    switchMap((a)= > from(api.fetch).pipe(
      switchMap(resp= > ({
        type: FETCH_SUCCESS,
        payload: {
          // ...
        }
      }),
      catchError(error= > of({
        type: FETCH_ERROR,
        payload: {
          / /...}})))))}Copy the code

In the application, we might define different converters to get streams that distribute different actions:

const newActionsStreams: Observable<Action>[] = transformers.map(transformer= > transformer(action$))
Copy the code

Since these actions also have a consistent data structure, we can combine these flows and the merged flow is responsible for distributing actions to reducer:

const newAction$ = merge(newActionStreams)
Copy the code

So, modify our middleware implementation:

constcreateMiddleware = (... transformers):Middleware= > {
  const action$ = new Subject()
  // Run each transformer and merge the transformed streams
  const newAction$ = merge(tramsformer.map(transformer= > transformer(action$)))
  const middleware: Middleware = store= > {
    / / subscribe newAction $
    newAction$.subscribe(action= > store.dispatch(action))
    return next= > action => {
      // Send the actions to reducer
      const result = next(action)
      // Place the action in action$for circulation
      action$.next(action)
      return result
    }
  }
  return middleware
}
Copy the code

Optimization:ofType operator

Since we always need filter(action => action.type === SOME_TYPE) to filter the action, we can encapsulate an operator to optimize the process:

const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = (type: String) = > pipe(
  filter(action= > action.type === type))Copy the code
const fetchTransformer: Transformer = (action$) {
  return action$.pipe(
    filter(({type}) = > type === FETCH),
    switchMap((a)= > from(api.fetch)),
    // ...)}Copy the code

Considering that we may filter more than one action Type, we can optimize our ofType operator as follows:

const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = 
  (. types:String[]) = > pipe(
    filter((action: Action) = > types.indexOf(action.type) > - 1))Copy the code
const counterTransformer: Transformer = (action$) {
  return action$.pipe(
    ofType(INCREMENT, DECREMENT),
    // ...)}Copy the code

The following test case will test if our middleware is working:

it('should transform action'.(a)= > {
   const reducer: Reducer = (state = 0, action) = > {
    switch(action.type) {
      case 'PONG':
        return state + 1
      default:
        return state
    }
  }

  const transformer: Transformer = (action$) = > {
    return action$.pipe(
        ofType('PING'),
        mapTo({type: 'PONG'}})))const middleware = createMiddleware(transformer)
  const store = createStore(reducer, applyMiddleware(middleware))
  store.dispatch({type: 'PING'})
  expect(store.getState()).to.be.equal(1)})Copy the code

Optimization: Get state

Application state may also be required during the flow of an action. For example, request parameters need to be wrapped before data is fetched in FETCH $, and some of the parameters may come from application state. Therefore, consider passing the current store object to each Transformer to get the current app state:

interface Transformer {
  (action$: Observable<Action>, store: Store): Observable<Action>
}

// ...

constcreateMiddleware = (... transformers):Middleware= > {
  const action$ = new Subject()
  const middleware: Middleware = store= > {
    // Pass store to Transformer as well
    const newAction$ = merge(tramsformer.map(transformer= > transformer(action$, store)))
    newAction$.subscribe(action= > store.dispatch(action))
    return next= > action => {
      const result = next(action)
      action$.next(action)
      return result
    }
  }
  return middleware
}
Copy the code

Now, when the state needs to be retrieved, it is retrieved via store.getState() :

const fetchTransformer: Transformer = (action$, store) {
  return action$.pipe(
    filter(({type}) = > type === FETCH),
    switchMap((a)= > {
      const { query, page, pageSize } = store.getState()
      const params = { query, page, pageSize }
      return from(api.fetch, params)
    }),
    // ...)}Copy the code

Optimization: Observe the state

Under responsive programming, all data sources should be observable, and we’ve assessed state on a proactive basis. The right approach is to observe changes in state and make decisions as they happen:

To do this, like action$, we also stream state to make the application state an observable and pass state$to Transformer:

interface Transformer {
  (action$: Observable<Action>, state$: Observable<State>): Observable<Action>
}

// ...

constcreateMiddleware = (... transformers):Middleware= > {
  const action$ = new Subject()
  const state$ = new Subject()
  const middleware: Middleware = store= > {
    // Each transformer gets the action$for the application
    const newAction$ = merge(tramsformer.map(transformer= > transformer(action$, state$)))
    // When a new action arrives, dispatch it to the Redux ecosystem
    newAction$.subscribe(action= > store.dispatch(action))
    return next= > action => {
      // Pass the actions to the Reducer
      const result = next(action)
      // Obtain the new state after reducer processing
      state$.next(state)
		  // Put action into action$
      action$.next(action)
      return result
    }
  }
  return middleware
}
Copy the code

When a business process needs state, you can freely combine state$to get:

const fetchTransformer: Transformer = (action$, state$) {
  return action$.pipe(
    filter(({type}) = > type === FETCH),
    withLatestFrom(state$),
    switchMap(([action, state]) = > {
      const { query, page, pageSize } = state
      const params = { query, page, pageSize }
      return from(api.fetch, params)
    }),
    // ...)}Copy the code

At first glance, this may not seem as convenient as store.getState(), but to get the current state, we also introduce an additional operator withLatestFrom. Note, however, that we introduced state$not only to obtain state and uniform patterns, but more importantly to observe state.

For example, we have a memo component that stores drafts every time the content changes. If we can observe state changes, by reactive programming mode, when the state changes, automatically form the business of draft storage:

const saveDraft$: Observable<Action> = state$.pipe(
  // Select current
	pluck('content'),
  // Only consider storing drafts when the content changes
  distinctUntilChanged(),
  // Only save once within 1 s
  throttleTime(1000),
  // Invoke the service to store the draft
  switchMap(content= > from(api.saveDraft(content)))
  / /...
)
Copy the code

As you can see from the first review, redux-Observable’s introduction of state$in version 1.0 allowed us to decouple components from their business relationships and achieve autonomy for individual components.

Optimization: Response initial state

Now we can test the current middleware to see if we can observe the application state:

it('should observe state'.(a)= > {
   const reducer: Reducer = (state = {step: 10, counter: 0}, action) = > {
    switch(action.type) {
      case 'PONG':
        return {
          ...state,
          counter: action.counter
        }
      default:
        return state
    }
  }

  const transformer: Transformer = (action$, state$) = > {
    return action$.pipe(
        ofType('PING'),
      	withLatestFrom(state$, (action, state) = > state.step + state.counter),
        map(counter= > ({type: 'PONG', counter}))
      )
    )
  }

  const middleware = createMiddleware(transformer)
  const store = createStore(reducer, applyMiddleware(middleware))
  store.dispatch({type: 'PING'})
  expect(store.getState().counter).to.be.equal(10)})Copy the code

Unfortunately, this test case will not pass. Debugging shows that when we dispatch the PING action, withLatestFrom does not get the last state. Why is that? Because Redux’s init action is not exposed to middleware interception, the initial state of the application is not sent into state$and the observer cannot see the initial state.

To solve this problem, after creating a store, we can try dispatching a meaningless action to the middleware to force the initial state into state$first:

const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// Issue an action to get the initial state
store.dispatch({type: '@@INIT_STATE'})
Copy the code

This was a good way to pass the test, but it was not very elegant. We asked the user to manually send out a meaningless action, which made the user feel confused. Therefore, we considered setting up a separate API for the middleware to accomplish some tasks after the Store is created:

// Set a copy of store
let cachedStore: Store
constcreateMiddleware = (... transformers):Middleware= > {
  const action$ = new Subject()
  const state$ = new Subject()
  const newAction$ = merge(transformers.map(transformer= > transformer(action$, state$)))
  
  const middleware: Middleware = store= > {
    cachedStore = store
    
    return next= > action => {
      // Pass the actions to the Reducer
      const result = next(action)
      // Obtain the new state after reducer processing
      state$.next(state)
		  // Put action into action$
      action$.next(action)
      return result
    }
  }
  
  middleware.run = function() {
    // 1. Start the action subscription
    newAction$.subscribe(cachedStore.dispatch)
    // 2. Pass initial state to state$
    state$.next(cachedStore.getState())
  }
  return middleware
}
Copy the code

Now we provide a run method for the middleware to do some work after the store is created. When we have created the store, we run the run method to run the middleware:

const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// Run our middleware
middleware.run()
Copy the code

Optimization: Correlated Transformers

Consider a more specific scenario where transformers may be related to each other and trasformer may issue actions directly without relying on action$:

it('should queue synchronous actions'.(a)= > {
    const reducer = (state = [], action) = > state.concat(action)
    const transformer1 = (action$, state$) = > action$.pipe(
      ofType('FIRST'),
      mergeMap((a)= > of({ type: 'SECOND' }, { type: 'THIRD'})))const transformer2 = (action$, state$) = > action$.pipe(
        ofType('SECOND'),
        mapTo({type: 'FORTH'}))const middleware = createMiddleware(transformer1, transformer2)
    const store = createStore(reducer, applyMiddleware(middleware))
    middleware.run()
    
    const actions = store.getState()
    actions.shift() // remove redux init action
    expect(actions).to.deep.equal([
      { type: 'FIRST' },
      { type: 'SECOND' },
      { type: 'THIRD' },
      { type: 'FORTH'})})Copy the code

In this test case, the action sequence we see is:

FIRST
SECOND
THIRD
FORTH
Copy the code

In the current implementation, however, you get:

FIRST
SECOND
FORTH
THIRD
Copy the code

This did not meet expectations. But what went wrong? Let’s analyze the program execution process:

  1. A first action
  2. Schedules the first action, spawns the Second and third Action Observables
  3. Dispatches second actions, deriving the Observable of the Forth Action
  4. Scheduling forth action
  5. Scheduling third action

The problem is obviously in steps 2 and 3. If we control the speed at which the Observable spits out values, cache the second and third actions that arrive at the same time into the queue, and execute them sequentially, we get the desired output.

Fortunately, an operator called observeOn is provided in RxJS to control the pace at which the data source emits values. The first parameter receives a Scheduler, which tells the data source how quickly to schedule tasks. Here we use Queue Scheduler to cache each action in the Queue. When there are no more actions, each action is queued and scheduled:

export const createEpicMiddleware = (. epics) = > {
  const action$ = new Subject().pipe(observeOn(queueScheduler)) as Subject<Action>
  
  // ...
  
  return middleware
}
Copy the code

Now, run the test case again and you should see the action sequence that matches your expectations:

FIRST
SECOND
THIRD
FORTH
Copy the code

This is because:

  1. A first action
  2. Call in the first action
  3. There is no first action.store.dispatch(first), which derives the Second action and third Action observables
  4. Second Action in, third Action in
  5. If no action is waiting, the second Action is queued.store.dispatch(second), deriving the Forth Action Observable
  6. Forth the action team
  7. There is no waiting action, the first element of the team is third Action,store.dispatch(third)
  8. Forth Action,store.dispatch(forth)

conclusion

So far, our middleware has allowed us to tease out application state using FRP patterns, and the implementation of this middleware is very similar to the implementation of Redux-Observable. Of course, the more popular and stable Redux-Observable is still used in the production environment. This paper aims to help you better understand how to integrate RxJS into Redux for better state management. By optimizing the middleware step by step, It also gives you an understanding of the design philosophy and implementation principle of Redux-Observable. The Mini Redux-Observable implemented in this article was also posted on my Github, including some test cases and a small demo.

Next, we will explore the integration of redux-Observable and FRP patterns into the front-end framework of the DVA architecture, which helps cut down the long boilerplate code of Redux, while redux-Observable focuses on side effects.


The resources

  • RxJS API document
  • PRIMER ON RXJS SCHEDULERS
  • redux-observable #493 pull request
  • Gerard Sans — Bending Time with Schedulers and RxJS 5

About this Series

  • This series will start with the introduction of Redux-Observable 1.0 and explain my experience in combining RxJS with Redux. The content involved will include redux-Observable practice introduction, redux-Observable implementation principle exploration, Finally, I will introduce reObservable, a state management framework based on Redux-Observble + DVA Architecture.
  • This series is not an introduction to RxJS or Redux, but rather their basic concepts and core strengths. If you search for RxJS and stumble into this series and become interested in RxJS and FRP programming, then I would recommend getting started:
    • learnrxjs.io
    • Andre Staltz’s series of classes at Egghead. IO
    • RxJS by Cheng Mo
  • This series is not intended to be a tutorial, but rather an introduction to some of my own ideas on how to use RxJS in Redux. Hopefully, more people will point out some of the pitfalls and share more elegant practices.
  • I would like to express my sincere thanks for the help of some senior students on the way of practice, especially For the guidance of Questguo from Tencent Cloud. Reobservable is derived from TCFF, the React framework led by Tencent Cloud QuestGuo, and looks forward to the open source of TCFF in the future.
  • Thanks to Xiaoyu for the design support.