3. Crazy geek
The original:
https://blog.logrocket.com/a-…
This article first send WeChat messages public number: jingchengyideng welcome attention, every day to push you fresh front-end technology articles
Many people wonder how a single-threaded Node.js can compete with a multithreaded backend. Given its so-called single-threaded nature, it seems counterintuitive that many large companies would choose Node as a backend. To understand why, you have to understand what it really means to be single threaded.
JavaScript is well designed for doing relatively simple things on the Web, such as validating forms or creating iridescent mouse trails. In 2009, Ryan Dahl, the founder of Node.js, made it possible for developers to write back-end code in the language.
Backend languages that typically support multithreading have various mechanisms for synchronizing data between threads and other thread-oriented functions. Adding support for such functionality to JavaScript requires modifying the entire language, which is not Dahl’s goal. In order for pure JavaScript to support multithreading, he had to come up with a workaround. Let’s explore the mystery…
How does Node.js work
Node.js uses two types of threads: the main thread handled by the Event Loop and several worker threads in the worker pool.
An event loop is a mechanism that takes callbacks (functions) and registers them for execution at some point in the future. It runs in the same thread as the associated JavaScript code. The event loop is also blocked when a JavaScript operation blocks a thread.
A work pool is an execution model that spins and processes individual threads, then performs tasks synchronously and returns the results to an event loop. The event loop uses the returned result to execute the provided callback.
In short, it is responsible for asynchronous I/O operations — primarily interactions with system disks and the network. It is primarily used by modules such as FS (I/O intensive) or Crypto (CPU intensive). The working pool is implemented in libuv, which causes slight delays when Node needs to communicate internally between JavaScript and C++, but this is almost imperceptible.
Based on these two mechanisms, we can write the following code:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
if (err) {
return null;
}
console.log(content.toString());
});
The aforementioned FS module tells the working pool to use one of the threads to read the contents of the file and notifies the event loop when it is finished. The event loop then gets the provided callback function and executes it with the contents of the file.
This is an example of non-blocking code, where we don’t have to wait for something to happen synchronously. Simply tell the working pool to read the file and use the result to call the provided function. Since the working pool has its own thread, the event loop can continue to execute normally while the file is read.
This is fine when you don’t need to perform complex operations synchronously: any function that takes too long to run will block the thread. If you have a lot of this functionality in your application, you can significantly reduce the throughput of your server, or even freeze it completely. In this case, work cannot continue to be delegated to the workpool.
You can’t really use Node.js effectively when you need to perform complex computations on data (such as AI, machine learning, or big data) because the operation blocks the primary (and only) thread, rendering the server unresponsive. This was the case before the release of Node.js v10.5.0, which added support for multithreading.
Brief introduction:worker_threads
The worker_threads module allows us to create a fully functional multithreaded Node.js program.
A thread worker is a piece of code that is generated in a separate thread (usually pulled from a file).
Note that the terms thread worker, worker and thread are often used interchangeably and they all refer to the same thing.
To use the thread worker, you must import the worker_threads module. Let’s first write a function to help us generate these thread workers, and then discuss their properties.
type WorkerCallback = (err: any, result? : any) => any; export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) { const worker = new Worker(path, { workerData }); worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) => { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker; }
To create a Worker, you must first create an instance of the Worker class. Its first argument provides the path to the file that contains the worker’s code; The second argument provides an object named WorkerData that contains a single attribute. This is the data that we want the thread to have access to when it starts running.
Note that the path should always refer to a file with a.js or.mjs extension, regardless of whether you are using JavaScript or the language you will eventually convert to JavaScript (for example, TypeScript).
I also want to point out why the callback method is used instead of returning a Promise that will be resolved when the Message event is fired. This is because the worker can send many message events instead of just one.
As you can see from the above example, the communication between threads is event-based, which means that we set up the listener that the worker calls after sending a given event.
The following are the most common events:
worker.on('error', (error) => {});
An Error event is issued whenever an uncaught exception is found in the Worker. The worker is then terminated, and the error can be used as the first parameter in the provided callback.
worker.on('exit', (exitCode) => {});
An exit event is emitted when the worker exits. If process.exit() is called in the worker, then ExitCode will be supplied to the callback. If the worker is terminated with Worker.Terminate (), the code is 1.
worker.on('online', () => {});
Online events are emitted as soon as the worker stops parsing JavaScript code and starts execution. It is not commonly used, but can be informative in certain situations.
worker.on('message', (data) => {});
A message event is emitted whenever the worker sends data to the parent thread.
Now let’s look at how to share data between threads.
Exchanging data between threads
To send data to another thread, use the port.postMessage() method. Its prototype is as follows:
port.postMessage(data[, transferList])
A Port object can be either a parentPort or an instance of MessagePort — more on that later.
The data parameter
The first argument — here called data — is an object that is copied to another thread. It can be anything that the replication algorithm supports.
The data is copied by the structured cloning algorithm. Quoted from Mozilla:
It clones the input object by recursively, while maintaining the mapping of previously accessed references to avoid an infinite traversal loop.
The algorithm does not copy functions, errors, property descriptors, or prototype chains. Also note that copying objects in this way is different from using JSON, because it can contain circular references and typed arrays, which JSON cannot.
With the ability to copy typed arrays, the algorithm can share memory between threads.
Sharing memory between threads
One could argue that modules like cluster or child_process started using threads a long time ago. That’s true and that’s not true.
The Cluster module can create multiple node instances, with a main process routing requests between them. Clustering can effectively increase the throughput of the server; But we cannot use the cluster module to generate a single thread.
People tend to use tools like PM2 to centrally manage their programs rather than executing them manually in their own code. If you’re interested, look into how to use the Cluster module.
The child_process module can generate any executable file, whether it is written in JavaScript or not. It is very similar to worker_threads, but lacks several important features of the latter.
Specifically, Thread Workers are lighter and share the same process ID as their parent thread. They can also share memory with the parent thread, which can avoid serializing large data loads and thus pass data back and forth more efficiently.
Now let’s look at how to share memory between threads. To share memory, an instance of ArrayBuffer or ShareDarrayBuffer must be sent to another thread as a data parameter.
This is a worker that shares memory with its parent thread:
import { parentPort } from 'worker_threads';
parentPort.on('message', () => {
const numberOfElements = 100;
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
const arr = new Int32Array(sharedBuffer);
for (let i = 0; i < numberOfElements; i += 1) {
arr[i] = Math.round(Math.random() * 30);
}
parentPort.postMessage({ arr });
});
First, we create a ShareDarrayBuffer with memory containing 100 32-bit integers. Next, create an instance of Int32Array, which holds its structure with a buffer, then populate the array with some random numbers and send it to the parent thread.
In the parent thread:
import path from 'path'; import { runWorker } from '.. /run-worker'; const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => { if (err) { return null; } arr[0] = 5; }); worker.postMessage({});
Changing the value of arr [0] to 5 actually modifies it in both threads.
Of course, with shared memory, we run the risk of changing a value in one thread while also changing it in another thread. But we also get a benefit in this process: the value can be used in another thread without serialization, which greatly improves efficiency. Just remember to manage the correct references to the data so that it can be garbage collected once the data is processed.
Sharing an array of integers is all very well, but what we’re really interested in is sharing objects — that’s the default way to store information. Unfortunately, there is no sharedobjectBuffer or anything like that, but we can create a similar structure ourselves.
TransferList parameters
Transferlist can only contain ArrayBuffer and MessagePort. Once they have been passed to another thread, they cannot be passed again; Because the contents of the memory have been moved to another thread.
Currently, network sockets cannot be transferred through Transferlist (you can use the child_process module).
Creating Communication Channels
Communication between threads takes place through Port, an instance of the MessagePort class, and enables event-based communication.
There are two ways to use Port to communicate between threads. The first is the default value, which is easy. In the Worker code, we import an object named parentPort from the Worker_Threads module and use the object’s.postMessage() method to send the message to the parent thread.
Here’s an example:
import { parentPort } from 'worker_threads';
const data = {
// ...
};
parentPort.postMessage(data);
ParentPort is a MessagePort instance that Node.js creates behind the scenes to communicate with the parent thread. This allows the parentPort and Worker objects to communicate between threads.
The second way to communicate between threads is to create a MessageChannel and send it to the worker. Here’s how to create a new MessagePort and share it with our worker:
import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker(path.join(__dirname, 'worker.js'));
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => {
console.log('message from worker:', message);
});
worker.postMessage({ port: port2 }, [port2]);
After creating port1 and port2, we set the event listener on port1 and send port2 to the worker. We must include it in the transferList in order to transfer it to the worker.
Within Worker:
import { parentPort, MessagePort } from 'worker_threads'; parentPort.on('message', (data) => { const { port }: { port: MessagePort } = data; port.postMessage('heres your message! '); });
This way, we can use the port sent by the parent thread.
Using ParentPort is not necessarily the wrong approach, but it is better to create a new MessagePort with an instance of the MessageChannel and then share it with the generated worker.
Notice that in the later examples, I use parentPort for brevity.
Two ways to use Worker
Worker can be used in two ways. The first is to generate a worker, execute its code, and send the result to the parent thread. In this way, a worker must be recreated every time a new task appears.
The second method is to generate a worker and set a listener for the message event. Each time Message is fired, it completes the work and sends the result back to the parent thread, which keeps the worker active for future use.
The Node.js documentation recommends the second method because of the overhead associated with creating the virtual machine and parsing and executing the code while creating the thread worker. Therefore, this method is more efficient than constantly generating new workers.
This approach is called a workpool because we create a workpool and let them wait, scheduling Message events to complete the work as needed.
Here is an example of creating, executing, and closing a worker:
import { parentPort } from 'worker_threads';
const collection = [];
for (let i = 0; i < 10; i += 1) {
collection[i] = i;
}
parentPort.postMessage(collection);
When the collection is sent to the parent thread, it exits.
Here is an example of a worker that can wait a long time before a given task:
import { parentPort } from 'worker_threads';
parentPort.on('message', (data: any) => {
const result = doSomething(data);
parentPort.postMessage(result);
});
Important properties available in the worker_threads module
Some properties are available in the worker_threads module:
isMainThread
This property is true when not operating within a worker thread. If you feel it is necessary, you can include a simple if statement at the beginning of the worker file to ensure that it only runs as a worker.
import { isMainThread } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
workerData
Data contained in the Worker’s constructor when the thread is spawn.
const worker = new Worker(path, { workerData });
In the worker thread:
import { workerData } from 'worker_threads';
console.log(workerData.property);
parentPort
The previously mentioned MessagePort instance is used to communicate with the parent thread.
threadId
The unique identifier assigned to the worker.
Now that we know the technical details, let’s implement something and test what we’ve learned in practice.
implementationsetTimeout
SetTimeout is an infinite loop that, as the name implies, detects if the program has run out of time. It checks in the loop whether the sum of the start time and the given number of milliseconds is less than the actual date.
import { parentPort, workerData } from 'worker_threads'; const time = Date.now(); while (true) { if (time + workerData.time <= Date.now()) { parentPort.postMessage({}); break; }}
This particular implementation spawns a thread, executes its code, and finally exits when it is finished.
The next implementation uses the worker’s code. First create a state to track the generated worker:
const timeoutState: { [key: string]: Worker } = {};
Then the function responsible for creating the worker and saving it to the state:
export function setTimeout(callback: (err: any) => any, time: number) { const id = uuidv4(); const worker = runWorker( path.join(__dirname, './timeout-worker.js'), (err) => { if (! timeoutState[id]) { return null; } timeoutState[id] = null; if (err) { return callback(err); } callback(null); }, { time, }, ); timeoutState[id] = worker; return id; }
First, we use the UUID package to create a unique identifier for the worker, and then we use the previously defined function runWorker to get the worker. We also pass in a callback function to the worker, which will be fired once the worker sends the data. Finally, save the worker in the state and return the ID.
In the callback function, we must check whether the worker still exists in the state, because there is a possibility of cancelTimeout(), which will delete it. If it does exist, remove it from the state and call the callback passed to the setTimeout function.
The cancelTimeout function forces the worker to exit using the.terminate() method and removes the worker from the state:
export function cancelTimeout(id: string) {
if (timeoutState[id]) {
timeoutState[id].terminate();
timeoutState[id] = undefined;
return true;
}
return false;
}
If you’re interested, I also implemented setInterval, and the code is here, but since it does nothing for threads (we reuse the code for setTimeout), I’ve decided not to explain it here.
I’ve created a short test code to examine the differences between this approach and the native approach. You can find the code here. Here are the results:
Native setTimeout {MS: 7004, Averagecpucost: 0.1416} Worker setTimeout {MS: 7046, Averagecpucost: 0.308}
We can see that setTimeout has a little delay – about 40ms – when the worker is being created. The average CPU cost is also slightly higher, but nothing unbearable (the CPU cost is the average CPU utilization over the duration of the process).
If we can reuse the worker, we can reduce latency and CPU usage, which is why we need to implement the work pool.
Implementing the Workpool
As mentioned above, a work pool is a given number of pre-created workers that remain idle and listen for message events. Once the message event is fired, they start working and send back the result.
To better describe what we are going to do, let’s create a work pool of eight thread workers:
const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
If you’re familiar with limiting concurrency, the logic you see here is almost the same, just a different use case.
As shown in the code snippet above, we pass the path to the worker and the number of workers to generate to the WorkerPool constructor.
export class WorkerPool<T, N> { private queue: QueueItem<T, N>[] = []; private workersById: { [key: number]: Worker } = {}; private activeWorkersById: { [key: number]: boolean } = {}; public constructor(public workerPath: string, public numberOfThreads: number) { this.init(); }}
There are other attributes, such as WorkersById and ActiveWorkersById, where we can store the ID of the existing worker and the currently running worker, respectively. There is also a queue, where we can store objects using the following structure:
type QueueCallback<N> = (err: any, result? : N) => void; interface QueueItem<T, N> { callback: QueueCallback<N>; getData: () => T; }
The callback is simply the default node callback, with the first argument being an error and the second being a possible result. GetData is the function (described below) that is passed to the workpool.run () method and is called once the project starts processing. The data returned by the getData function is passed to the worker thread.
In the.init() method, we create workers and save them in the following state:
private init() { if (this.numberOfThreads < 1) { return null; } for (let i = 0; i < this.numberOfThreads; i += 1) { const worker = new Worker(this.workerPath); this.workersById[i] = worker; this.activeWorkersById[i] = false; }}
To avoid an infinite loop, we first make sure that the thread count is > 1. The number of valid workers is then created and their indexes are stored in the WorkersById state. We save information about whether they are currently running in the ActiveWorkersById state, which is always false by default.
Now we must implement the previously mentioned.run() method to set up a Worker available task.
public run(getData: () => T) { return new Promise<N>((resolve, reject) => { const availableWorkerId = this.getInactiveWorkerId(); const queueItem: QueueItem<T, N> = { getData, callback: (error, result) => { if (error) { return reject(error); } return resolve(result); }}; if (availableWorkerId === -1) { this.queue.push(queueItem); return null; } this.runWorker(availableWorkerId, queueItem); }); }
In the Promise function, we first check to see if there are any idle workers available to process the data by calling.getInActiveWorkerId () :
private getInactiveWorkerId(): number { for (let i = 0; i < this.numberOfThreads; i += 1) { if (! this.activeWorkersById[i]) { return i; } } return -1; }
Next, we create a QueueItem to hold the getData function passed to the.run() method and the callback. In a callback, we either resolve or reject a promise, depending on whether the worker passes an error to the callback.
If the availableWorkerId value is -1, which means that no worker is currently available, we add the QueueItem to the Queue. If a worker is available, the.runWorker() method is called to execute the worker.
In the.runWorker() method, we must set the activeWorkersById of the current worker to the active state; Set event listeners for Message and Error events (and clean them up later); Finally, the data is sent to the worker.
private async runWorker(workerId: number, queueItem: QueueItem<T, N>) { const worker = this.workersById[workerId]; this.activeWorkersById[workerId] = true; const messageCallback = (result: N) => { queueItem.callback(null, result); cleanUp(); }; const errorCallback = (error: any) => { queueItem.callback(error); cleanUp(); }; const cleanUp = () => { worker.removeAllListeners('message'); worker.removeAllListeners('error'); this.activeWorkersById[workerId] = false; if (! this.queue.length) { return null; } this.runWorker(workerId, this.queue.shift()); }; worker.once('message', messageCallback); worker.once('error', errorCallback); worker.postMessage(await queueItem.getData()); }
First, we get the worker reference from the workersById by using the passed workerID. Then, in the ActiveWorkersById, set the [WorkerId] property to true so that we can know that the worker is busy and not running other tasks.
Next, MessageCallback and ErrorCallback are created to be called on message and error events, respectively, and then the functions are registered to listen for events and send data to the worker.
In the callback, we call the callback to QueueItem and then call the cleanUp function. In the cleanUp function, the event listener is removed because we will reuse the same worker multiple times. If the listener is not deleted there will be a memory leak and memory will slowly run out.
In the ActiveWorkersById state, we set the [WorkerId] property to false and check if the queue is empty. If not, the first item is removed from the queue and the worker is called again with another QueueItem.
Next create a worker that performs some calculations after receiving the data in the message event:
import { isMainThread, parentPort } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data: any) => {
const collection = [];
for (let i = 0; i < 1000000; i += 1) {
collection[i] = Math.round(Math.random() * 100000);
}
return collection.sort((a, b) => {
if (a > b) {
return 1;
}
return -1;
});
};
parentPort.on('message', (data: any) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
Worker creates an array of 1 million random numbers and sorts them. It doesn’t matter what you do as long as it takes you a little longer to get it done.
Here is an example of a simple use of the workpool:
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);
const items = [...new Array(100)].fill(null);
Promise.all(
items.map(async (_, i) => {
await pool.run(() => ({ i }));
console.log('finished', i);
}),
).then(() => {
console.log('finished all');
});
Start by creating a work pool of eight workers. We then create an array of 100 elements, and for each element, we run a task in the work pool. Eight tasks are executed immediately after starting the run, and the rest are queued and executed one by one. By using a workpool, we don’t have to create a worker every time, which greatly improves efficiency.
conclusion
Worker_threads provides an easy way to add multithreading support to a program. By delegating heavy CPU computations to other threads, you can significantly improve the throughput of your server. With official threading support, we can expect more developers and engineers from AI, machine learning, and big data to use Node.js.
The first send WeChat messages public number: Jingchengyideng
Welcome to scan the two-dimensional code to pay attention to the public number, every day to push you fresh front-end technology articles
Read on for the other great articles in this column:
- 12 Amazing CSS Experiment Projects
- What are the front-end interview questions at the world’s top companies
- CSS Flexbox Visualization Manual
- The holidays are boring? Write a little brain game in JavaScript!
- React from a designer’s point of view
- How does CSS sticky positioning work
- A step-by-step guide to implementing animations using HTML5 SVG
- Programmer 30 years old before the monthly salary is less than 30K, which way to go
- 7 open front end questions
- React Tutorial: A Quick Start Guide