takeaway

After reading this article, you will know:

How to use RxJS to manage Web application data flow how to combine RxJS with WebWorker

In the previously shared article “Out of haze” : Creation Practice of Spatio-temporal Visual Narrative, we created the visual narrative work “Out of Haze” by using Columbus narrative tool (a spatio-temporal visual narrative creation platform developed by Rael Platform, which will be online for everyone to use in the future). The whole work includes various updates and transformations of map data, and the data flow is also very complicated. Next, I’ll show you how to use RxJS in Columbus to manage the data flow of complex Web applications.

Problems encountered

Data input sources of Columbus include event, Ajax, WebWorker, etc. Data processing includes: graph filter drawn by users on the map, field filter added by users, and filter according to the graph clicked when users click on the chart. Different views are also linked according to data processing. The stages of the data are different, and so are their uses.

Data Sources and Views

The previous code had variables to cache the data, variables to determine whether the data was returned, combinations of promises to fetch different data, and publish and subscribe. Different data get data in different ways, code writing is also different; The code dealing with asynchrony and data composition is complex, and it is difficult to understand the relationship and processing flow between different data when reading the code.

How does RxJS solve this problem

Think of RxJS as Lodash for events.

RxJS can unify different data sources into the same data flow, through different operators will be synchronous and asynchronous data as a set to process, unified the connection between different data sources; RxJS consolidates the separate data processing logic of the data layer in one place, improving the readability of the code.

Manage data flow using RxJS

RxJS deals with synchronization and asynchronous problems through Observable sequences. Different Observables and operators can be used reasonably to unify different data sources.

The processing process of different data in Columbus

To achieve the separation of data and view, we implement a Dataset class to aggregate the data processing logic and manage the data flow of Columbus. In the figure above, data processing at each stage is an Observable to publish and subscribe, and each part of the data flow corresponds to the method in the Dataset. When data of different phases is needed, the corresponding Observable is used to subscribe at the corresponding location. The BehaviorSubject stores the latest value sent to the consumer. If there is a new subscription, the BehaviorSubject immediately receives the latest value from the BehaviorSubject, so the BehaviorSubject can cache data processing nodes.

Relationship between Dataset and WorkerPool

Use WebWorker and RxJS to realize data flow optimization

All of Columbus’s data processing is done in the browser, and if the data volume is very large, the data processing will block the main thread and make the user’s interaction not smooth. Webworkers can run code in a separate thread. If the logic of data processing is executed in the WebWorker, it will reduce the load of the MAIN JS thread.

WebWorker can also be regarded as an asynchronous data source, and RxJS can process asynchronous data well. If RxJS can be elegantly combined with WebWorker, it will greatly facilitate the use of RxJS. RxJS can retrieve data from from, fromEvent, and fromPromise, so fromEvent can retrieve data directly from WebWorker:

FromEvent (worker, 'message').subscribe((data) => {// do something}Copy the code

However, it is not enough to use a single WebWorker. If the WebWorker is processing data, it will still cause congestion and the data cannot respond in time. The solution is to have multiple Webworkers, where users just add tasks to the thread pool, and the thread pool decides how to schedule them to maximize the current computing resources. Therefore, we need to implement a WebWorker Pool to solve this problem, and this WebWorker Pool can be adapted to RxJS.

List of rxjs. fromEvent parameters

FromEventTarget Indicates the interface statement

As you can see from the rxjs. fromEvent parameter list, target is a data source of type FromEventTarget. Any data type that implements a set of interfaces described in FromEventTarget can be used as a data source for fromEvent. FromEventTarget describes a set of publish and subscribe interfaces. So the WebWorker Pool implements one of these interface declarations and RxJS gets data from it.

Implement WebWorker Pool for RxJS

Dependencies used:

  • In Webpack, we use worker-loader to handle the WebWorker code;

  • Event-emitter-es6 is a module conforming to FromEventTarget type, which can be inherited to implement thread pool conforming to interface description.

A simple WebWorker thread pool needs to implement these features:

  • Caching task

  • Clean up the task

  • Schedule idle workers to process tasks

  • Kill all workers when the thread pool is destroyed

First, we have a Worker that implements odd and even filtering as an example:

// worker.js const task = {evenFilter (data) {return data.filter(val => val % 2 === 0)}, OddFilter (data) {return data.filter(val => val % 2! == 0) } } function onMessage (e) { const data = e.data const { action, payload } = data const result = task[action](payload.data) // do something self.postMessage({ action, payload: { timestamp: payload.timestamp, data: result } }) } self.addEventListener('message', onMessage)Copy the code

Next, add some parameters to the Worker. For example, ID is used to mark the Worker performing the task, and BUSY is used to mark whether the current Worker is performing the task:

// js Thread class, Worker class Thread {constructor({threadCtor, id, constructor) OnMessage}) {this.thread = new threadCtor() // use worker.js as an example this.id = id this.busy = false this.onMessage = onMessage this.thread.addEventListener('message', e => { this._onMessage(e) }) } getId () { return this.id } isBusy () { return this.busy } postMessage (msg, transferable) { this.busy = true this.thread.postMessage(msg, transferable) } terminate () { this.thread.terminate() } _onMessage (e) { this.busy = false this.onMessage(e) } }Copy the code

Next, we need a ThreadPool to schedule threads, which can be designed as follows:

/** * @param {{threadCtor: emitters from 'event-emitter-es6' class ThreadPool extends ee { Worker, maxThreadsNumber: number } } param0 */ constructor({ threadCtor, MaxThreadsNumber} {) super () enclosing threadCtor = threadCtor / / worker constructor enclosing maxThreadsNumber = maxThreadsNumber | | Window. The navigator. HardwareConcurrency / * * * @ type {Array < Thread >} * / this. All the worker threads = [] / / this. The tasks = [] / / Init () {for (let I = 0; i < this.maxThreadsNumber; i++) { const thread = new Thread({ threadCtor: this.threadCtor, id: i, onMessage: MSG => {// The event sent to RxJS this.emitsync ('message', MSG) this.onMessage()}}) this.threads.push(thread)}} PostMessage (MSG, transferable) {this.tasks. Push ({MSG, transferable }) const thread = this.select() if (thread) { const { msg, transferable } = this.tasks.splice(0, 1)[0] Thread. PostMessage (MSG, transferable)}} /** * * And check whether tasks are empty. If not, */ onMessage () {if (this.tasks.length) {const thread = this.select() if (thread) {const {MSG, transferable } = this.tasks.splice(0, 1)[0] thread.postMessage(msg, Select () {return this.threads.filter(thread =>! Thread.isbusy ())[0]} clearTasks () {this.tasks = []} Destroy () {this.tasks = [] this.threads.foreach (thread => {thread.terminate()})}Copy the code

Then put the worker.js from above into a ThreadPool and execute:

import DataWorker from './worker.js'

const pool = new ThreadPool({
  threadCtor: DataWorker,
  maxThreadsNumber: 2
})
Copy the code

Send a message to the thread pool for processing:

// Generate data const initData = new Array(10000000).fill(0).map((val, idx) => idx); pool.postMessage({ action: "evenFilter", payload: { timestamp: Date.now(), data: initData, }, }); pool.postMessage({ action: "oddFilter", payload: { timestamp: Date.now(), data: initData, }, }); pool.postMessage({ action: "evenFilter", payload: { timestamp: Date.now(), data: initData, }, });Copy the code

RxJS retrieves data from the thread pool:

const evenObservable$ = Rx.fromEvent(pool, 'message');

Rx.fromEvent(pool, "message").subscribe((data) => {
  const { action, payload } = data.data;
  console.log(Date.now() - payload.timestamp);
  // ...
})
Copy the code

Scheduling of Webworkers in odd and even filter code

Record the code execution in the Chrome Performance panel. As can be seen from the figure above, the WebWorker Pool adapted to RxJS developed by us can meet the scheduling of different tasks under current conditions, and the scheduling process is transparent to users.

RxJS and WebWorker Pool data flow in Columbus

In Columbus, the combination of RxJS and WebWorker Pool not only accelerates the calculation of data, but also organizes the codes processed by different data sources according to the way of RxJS, making the data processing process clear.

conclusion

RxJS can unify different data sources and help us think about how to organize the logic of data flows and views.

Review past

  1. Creating a Cool 3D Map Visualization Product for B-end Clients
  2. Data Sources and Stored Computing
  3. Map Interaction and Pose Control
  4. Map Text Rendering
  5. Map Architecture Rendering
  6. Modeling and Output of Map Architecture
  7. Geographic Data Visualization
  8. “Map Cool Effects and Principles Revealed”
  9. Application of WebGL Rendering Pipeline in Web3D Map
  10. Implementation of CPU and GPU Animation in Web 3D Maps
  11. Out of the Haze: The Creative Practice of Space-time Visual Narration