preface

The following article explains the APIS provided by Async_hook and what they do. This article focuses on implementing thread pools using async_hooks.

A prerequisite for

To read and eat this article, you need to:

  • Basic knowledge of synchronous and asynchronous JavaScript programming
  • Learn how Node.js works
  • Understand the fundamentals of the Worker_Threads API
  • Understand the basic principles of async_hook API

A simple example

Review worker_Threads’ final example 🤔, which uses worker_Threads to spawn worker threads to run Fibonacci numbers.

|-main.js
|-worker.js
Copy the code

main.jsMain thread script

Spawn the worker thread, using./worker.js as the worker thread script

const {Worker} = require("worker_threads");
let number = 10;

const worker = new Worker("./worker.js", {workerData: {num: number}});

worker.once("message".result= > {
    console.log(`${number}th Fibonacci Result: ${result}`);
});

worker.on("error".error= > {
    console.log(error);
});

worker.on("exit".exitCode= > {
    console.log(`It exited with code ${exitCode}`);
})

console.log("Execution in main thread");
Copy the code

worker.jsWorker thread script

Fibonacci sequence operations

const {parentPort, workerData} = require("worker_threads");

parentPort.postMessage(getFibonacciNumber(workerData.num))

function getFibonacciNumber(num) {
    if (num === 0) {
        return 0;
    }
    else if (num === 1) {
        return 1;
    }
    else {
        return getFibonacciNumber(num - 1) + getFibonacciNumber(num - 2); }}Copy the code

Perform:

node main.js
Copy the code

Output:

Execution in main thread
10th Fibonacci Result: 55
It exited with code 0
Copy the code

The worker thread does not block Execution in the main thread.

Next, start the text 😄

Introduce async_hook usage of asynchronous resources in main.js, and the modified content is as follows:

New async_hook code snippet at the top. And console.log are replaced with fs.writesync. Because console.log is an asynchronous implementation, it increases the monitoring burden of async_hook.

const {Worker} = require("worker_threads");
/** --- --- --- here is the new content --- --- --- **/
const async_hooks = require('async_hooks')
const fs = require('fs')

async_hooks.createHook({
    init (asyncId, type, triggerAsyncId, resource) {
      fs.writeSync(1.`init ${type}(${asyncId}): trigger: ${triggerAsyncId}\n`, resource)
    },
    destroy (asyncId) {
      fs.writeSync(1.`destroy: ${asyncId}\n`);
    }
  }).enable()
/** --- --- --- New content end --- --- --- **/  

let number = 10;

const worker = new Worker("./worker.js", {workerData: {num: number}});

worker.once("message".result= > {
    fs.writeSync(1.`${number}th Fibonacci Result: ${result}\n`);
});

worker.on("error".error= > {
    fs.writeSync(1, error);
});

worker.on("exit".exitCode= > {
    fs.writeSync(1.`It exited with code ${exitCode}\n`);
})

console.log("Execution in main thread");
Copy the code

Then execute:

node main.js
Copy the code

Output:

init WORKER(2): trigger: 1
init MESSAGEPORT(3): trigger: 1
init TTYWRAP(4): trigger: 1
init SIGNALWRAP(5): trigger: 1
init TickObject(6): trigger: 1
init TTYWRAP(7): trigger: 1
init TickObject(8): trigger: 1
init MESSAGEPORT(9): trigger: 1
init MESSAGEPORT(10): trigger: 1
Execution in main threaddestroy: 6
destroy: 8
destroy: 10
10th Fibonacci Result: 55
destroy: 9
destroy: 3
init TickObject(11): trigger: 2
init TickObject(12): trigger: 2
It exited with code 0
init TickObject(13): trigger: 11
init TickObject(14): trigger: 12
destroy: 2
destroy: 11
destroy: 12
destroy: 13
destroy: 14
Copy the code

Analyze console output:

Execution context Resource sequence number The resource type describe
1(Top level of main thread) 2 WORKER Create a worker thread.
1(Top level of main thread) 3 MESSAGEPORT Create a communication port, one end of a two-way communication channel.
1(Top level of main thread) 4 TTYWRAP Create a terminal text wrapper
1(Top level of main thread) 5 SIGNALWRAP Creating a signaling package
1(Top level of main thread) 6 TickObject Create an asynchronous callback object
1(Top level of main thread) 7 TTYWRAP Create a terminal text wrapper
1(Top level of main thread) 8 TickObject Create an asynchronous callback object
1(Top level of main thread) 9 MESSAGEPORT Create a communication port, one end of a two-way communication channel.
1(Top level of main thread) 10 MESSAGEPORT Create a communication port, one end of a two-way communication channel.
8 TickObject The cancellation
10 MESSAGEPORT The cancellation
9 MESSAGEPORT The cancellation
3 MESSAGEPORT The cancellation
2(Worker top-level) 11 TickObject Create an asynchronous callback object
2(Worker top-level) 12 TickObject Create an asynchronous callback object
11(Worker thread TickObject) 13 TickObject Create an asynchronous callback object
12(Worker thread TickObject) 14 TickObject Create an asynchronous callback object
2 WORKER The cancellation
11 TickObject The cancellation
12 TickObject The cancellation
13 TickObject The cancellation
14 TickObject The cancellation

After the main thread spawns a worker thread, it first instantiates the objects (send and receive) associated with communication with the worker thread. After the operation result, the worker thread calls parentPort.postMessage to send the message, the main thread receives the message, prints the result, the asynchronous callback resource logout, and then the communication resource logout. The worker thread resource is then unregistered, and the asynchronous resource is also unregistered at the point where the worker thread created the resource.

Thread pool implementation

After learning about the chemistry between Async_hooks and Worker_Threads, let’s take a look at the official example of the AsyncResource class under the AsynC_hooks module and implementing the thread pool.

Code first, directory structure:

| - main. | js / / main thread script - task_processor. Js / / worker threads | - worker_pool. Script js / / thread pool implementationCopy the code

main.js

The main thread, based on the number of host CPU cores, instantiates the thread pool. Then perform 16 tasks:

const WorkerPool = require('./worker_pool.js');
const os = require('os');

const pool = new WorkerPool(os.cpus().length);
console.log(`cpus length:`, os.cpus().length)
let finished = 0;
for (let i = 0; i < 16; i++) {
  pool.runTask({ a: 100.b: i }, (err, result) = > {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}
Copy the code

task_processor.js

The worker thread calculates the objects passed by the main thread, adds the attributes of the objects, and sends the result back to the parent thread (main thread) :

const { parentPort } = require('worker_threads');
parentPort.on('message'.(task) = > {
  parentPort.postMessage(task.a + task.b);
});
Copy the code

worker_pool.js

const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.}}class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super(a);this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Whenever a kWorkerFreedEvent is issued, the next task to be processed in the queue (if any) is scheduled.
    this.on(kWorkerFreedEvent, () = > {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback); }}); }addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
    worker.on('message'.(result) = > {
      // If successful: invoke the callback passed to 'runTask', remove the 'TaskInfo' associated with the Worker, and mark it as idle again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error'.(err) = > {
      // If an uncaught exception occurs: call a callback passed to 'runTask' with an error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the Worker from the list and start a new Worker to replace the current Worker.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // There is no idle thread waiting for the idle worker thread.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate(); }}module.exports = WorkerPool;
Copy the code

Perform:

node main.js
Copy the code

Output:

cpus length: 8
3 null 103
8 null 108
9 null 109
10 null 110
11 null 111
12 null 112
13 null 113
14 null 114
15 null 115
4 null 104
2 null 102
1 null 101
7 null 107
6 null 106
Copy the code

Current host CPU core number 8, instantiate thread pool. Perform the following 16 tasks and return the calculation results.

Looking at the WorkerPool class, the constructor takes arguments based on the number of CPU cores, and the for loop calls addNewWorker.

The addNewWorker function pushes the Worker instance into its own array of workers and freeWorkers properties. The WorkerPool attributes are freeWorkers and Tasks, respectively, which store idle worker threads and task queues that store callback functions to be executed.

WorkerPoolTaskInfo () {WorkerPoolTaskInfo () {WorkerPoolTaskInfo () {WorkerPoolTaskInfo () {WorkerPoolTaskInfo (); To carry out the mission.

Next look at the WorkerPoolTaskInfo class, which inherits the AsyncResource class from the AsynC_hooks module. It uses its internal function runInAsyncScope to execute tasks and its internal function emitDestroy to trigger destroy hook.

RunInAsyncScope invokes supplied functions with supplied parameters in the execution context of an asynchronous resource. This establishes the context, fires AsyncHooks before the callback, invokes the function, fires AsyncHooks after the callback, and then restores the original execution context.

Async_hooks call all destroy hooks.

After the task is executed, the used worker object will return to the freeWorkers array, and then determine whether tasks in the task queue still have tasks. If so, the runTask function will be called to repeat the task process until all tasks are executed.

conclusion

Using thread pools avoids the overhead of subsequent thread instantiations and reuses thread instances. At the same time, using the task queue and the corresponding task processing strategy can greatly increase the number of concurrent project processing tasks.