Cluster is a built-in NodeJS module for nodeJS multi-core processing. Cluster module, can help us simplify the multi-process parallelization program development difficulty, easy to build a load balancing cluster.
Environment:
MacOS 10.14
Node v8.11.3
npm 6.4.0
practice
Master is the total controller node, and worker is the running node. Then start the worker based on the number of cpus. We can start by checking the number of cores and threads in our computer’s CPU:
sysctl machdep.cpu
Copy the code
machdep.cpu.core_count: 6
machdep.cpu.thread_count: 12
Copy the code
Six cores and 12 threads
The new app. Js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length; // Is the primary nodeif (cluster.isMaster) {
console.log("master start...");
// Fork workers.
for(var i = 0; i < numCPUs; I++) {// listen to create worker process event cluster.fork(); } // listen to worker cluster.on('listening'.function(worker, address) {// address object contains connection attribute information console.log(worker, address) {// address object contains connection attribute information console.log('listening: worker ' + worker.process.pid + ', Address: ' + address.address + ":" + address.port + ","+ address.addressType); // Kill all workers after 3 secondssetTimeout(() => { worker.kill() }, 3000) }); // Monitor worker exit event, code process exit error code, signal process kill signal cluster.on('exit'.function (worker, code, signal) {
// console.log('worker ' + worker.process.pid + ' died');
console.log('Worker process %d closed (%s)(%s). Restarting... ', worker.process.pid, signal || code); // cluster.fork(); }); }else {
console.log('createServer... ')
http.createServer(function (req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(0);
}
Copy the code
We set it to start three seconds before killing all processes
➜ node app.js master start... createServer... createServer... createServer... Listening: worker 29374, Address: null:61186,4 createServer... Listening: worker 29375, Address: null:61186,4 listening: worker 29376, Address: null:61186,4 createServer Listening: worker 29377, Address: null:61186,4 createServer... createServer... Listening: worker 29378, Address: null:61186,4 createServer... Listening: worker 29379, Address: null:61186,4 listening: worker 29380, Address: null:61186,4 createServer... Listening: worker 29381, Address: null:61186,4 createServer... Listening: worker 29382, Address: null:61186,4 listening: worker 29383, Address: null:61186,4 createServer... Listening: worker 29384, Address: null: 61185,4 Listening: worker 29385, Address: Null :61186,4 worker process 29374 closed (SIGTERM)(%s). Restart the... Worker process 29375 closed (SIGTERM)(%s). Restarting... Worker process 29376 closed (SIGTERM)(%s). Restarting... Worker process 29377 closed (SIGTERM)(%s). Restarting... Worker process 29378 closed (SIGTERM)(%s). Restarting... Worker process 29379 closed (SIGTERM)(%s). Restarting... Worker process 29380 closed (SIGTERM)(%s). Restarting... Worker process 29381 closed (SIGTERM)(%s). Restarting... Worker process 29382 closed (SIGTERM)(%s). Restarting... Worker process 29383 closed (SIGTERM)(%s). Restarting... Worker process 29384 closed (SIGTERM)(%s). Restarting... Worker process 29385 Closed (SIGTERM)(%s). Restarting...Copy the code
The computer has 12 threads, so after the master controller starts, 12 running nodes are generated and the worker kills the process, which can be monitored through exit event and restarted through fork()
Each worker Process communicates with the master Process based on IPC (inter-process Communication) by using child_process.fork() function.
When the worker uses server.listen (…) Function to pass the sequence of arguments to the master process. If the master process has matched workers, the handle is passed to the worker. If the master does not match the worker, a worker will be created and passed to the worker with a handle.
Because workers run independently, they can be deleted or restarted independently according to the needs of the program, and workers do not affect each other. As long as the worker is still alive, the master will continue to receive connections. Node does not automatically maintain the number of workers. We can create our own connection pool.
cluster
object
Various properties and functions of a cluster
cluster.setttings
: Sets the cluster parameter objectcluster.isMaster
: Judge whether or notmaster
nodecluster.isWorker
: Judge whether or notworker
nodeEvent: 'fork'
: Listen to createworker
Process eventsEvent: 'online'
: listeningworker
Create success eventEvent: 'listening'
: listeningworker
tomaster
State of the eventEvent: 'disconnect'
: listeningworker
Offline eventsEvent: 'exit'
: listeningworker
Exit eventEvent: 'setup'
: listeningsetupMaster
The eventcluster.setupMaster([settings])
: Sets cluster parameterscluster.fork([env])
: createworker
processcluster.disconnect([callback])
Closed:worket
processcluster.worker
: Get the currentworker
objectcluster.workers
: Gets all the surviving ones in the clusterworker
object
worker
object
Worker attributes and functions: cluster.workers, cluster.worker.
worker.id
Process:ID
No.worker.process
:ChildProcess
objectworker.suicide
In:disconnect()
After the judgementworker
Whether to commit suicideworker.send(message, [sendHandle])
:master
toworker
Send a message. Note:worker
To sendmaster
Send a message to useprocess.send(message)
worker.kill([signal='SIGTERM'])
: Kills the specifiedworker
, aliasdestory()
worker.disconnect()
Disconnection:worker
Connection,worker
suicideEvent: 'message'
: listeningmaster
andworker
themessage
The eventEvent: 'online'
: Listens on the specifiedworker
Create success eventEvent: 'listening'
: listeningmaster
toworker
State of the eventEvent: 'disconnect'
: listeningworker
Offline eventsEvent: 'exit'
: listeningworker
Exit event
Communication between master and worker
A new cluster. Js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log('[master] ' + "start master..."); // There are too many threadsfor (var i = 0; i < numCPUs / 4; i++) {
var wk = cluster.fork();
wk.send('[master] ' + 'hi worker'+ wk.id); } // Listen to worker generate cluster.on('fork'.function (worker) {
console.log('[master] fork: worker'+ worker.id); }); // When a new worker process is spawned, the worker process should respond with a live message. This event is triggered when the main process receives a go online message.'fork'Events and'online'The difference is that events are emitted when the main process spawns a worker process'fork'When the worker process is running'online'. cluster.on('online'.function (worker) {
console.log('[master] online: worker'+ worker.id); }); // When a worker process calls listen(), the server on the worker process fires'listening'Event, and the cluster on the main process'listening'Events. cluster.on('listening'.function (worker, address) {
console.log('[master] listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":"+ address.port); }); // Triggered when the worker IPC pipe is disconnected. Possible causes of event firing include a worker process exiting gracefully, being killed, or manually disconnecting (such as calling worker.disconnect()). cluster.on('disconnect'.function (worker) {
console.log('[master] disconnect: worker'+ worker.id); }); The cluster module fires when any worker process is shut down'exit'Events. cluster.on('exit'.function (worker, code, signal) {
console.log('[master] exit worker' + worker.id + ' died');
});
function eachWorker(callback) {
for (var id incluster.workers) { callback(cluster.workers[id]); }} // Push information to all workers after 3 secondssetTimeout(function () {
eachWorker(function (worker) {
worker.send('[master] ' + 'send message to worker' + worker.id);
});
}, 3000);
Object.keys(cluster.workers).forEach(function(id) {// Triggered when the cluster main process receives a message from any worker process. cluster.workers[id].on('message'.function (msg) {
console.log('[master] ' + 'message ' + msg);
});
});
} else if (cluster.isWorker) {
console.log('[worker] ' + "start worker ..."+ cluster.worker.id); // Similar to cluster.on('message') event, but specific to this worker process. process.on('message'.function(MSG) {// The master sends a message to initialize the worker console.log('[worker] '+ msg); Workers [id]. On (cluster.workers[id].'message',fn)
process.send('[worker] worker' + cluster.worker.id + ' received! ');
});
http.createServer(function (req, res) {
res.writeHead(200, { "content-type": "text/html" });
res.end('worker' + cluster.worker.id + ',PID:'+ process.pid); }).listen(3000); Console. log(' worker process${process.pid}Has launched `); }Copy the code
➜ node cluster.js [master] start master... [master] fork: worker1 [master] fork: worker2 [master] fork: worker3 [master] online: worker1 [master] online: worker2 [master] online: worker3 [worker] start worker ... 1 [worker] [master] hi worker1 [master] message [worker] worker1 received! [worker] start worker ... 2 [master] listening: worker1,pid:30288, Address:null:3000 [worker] start worker ... 3 [worker] [master] hi worker2 [master] message [worker] worker2 received! [master] listening: worker2,pid:30289, Address:null:3000 [worker] [master] hi worker3 [master] message [worker] worker3 received! [master] listening: worker3,pid:30290, Address:null:3000 [worker] [master] send message to worker1 [worker] [master] send message to worker2 [worker] [master] send message to worker3 [master] message [worker] worker1 received! [master] message [worker] worker2 received! [master] message [worker] worker3 received!Copy the code
Load balancing
Let’s simulate the access and see if it’s automatically allocated
load-balance.js
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log('[master] ' + "start master...");
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('listening'.function (worker, address) {
console.log('[master] ' + 'listening: worker' + worker.id + ',pid:' + worker.process.pid + ', Address:' + address.address + ":" + address.port);
});
} else if (cluster.isWorker) {
console.log('[worker] ' + "start worker ..." + cluster.worker.id);
http.createServer(function (req, res) {
console.log('worker' + cluster.worker.id);
res.end('worker' + cluster.worker.id + ',PID:' + process.pid);
}).listen(3000);
}
Copy the code
node load-balance.js [master] start master... [worker] start worker ... 1 [worker] start worker ... 3 [worker] start worker ... 2 [worker] start worker ... 4 [worker] start worker ... 6 [worker] start worker ... 7 [master] listening: worker3,pid:96592, Address:null:3000 [master] listening: worker2,pid:96591, Address:null:3000 [master] listening: worker1,pid:96590, Address:null:3000 [master] listening: worker4,pid:96593, Address:null:3000 [master] listening: worker6,pid:96595, Address:null:3000 [worker] start worker ... 5 [master] listening: worker7,pid:96596, Address:null:3000 [worker] start worker ... 8 [worker] start worker ... 9 [worker] start worker ... 10 [master] listening: worker5,pid:96594, Address:null:3000 [master] listening: worker8,pid:96597, Address:null:3000 [master] listening: worker9,pid:96598, Address:null:3000 [worker] start worker ... 11 [worker] start worker ... 12 [master] listening: worker10,pid:96599, Address:null:3000 [master] listening: worker11,pid:96600, Address:null:3000 [master] listening: worker12,pid:96601, Address:null:3000Copy the code
Let’s use curl to interview
➜ cluster curl http://172.16.78.185:3000/ worker1, PID: 96590% ➜ cluster curl http://172.16.78.185:3000/ Worker3, PID: 96592% ➜ cluster curl http://172.16.78.185:3000/ worker2, PID: 96591% ➜ cluster curl http://172.16.78.185:3000/ worker4, PID: 96593% ➜ cluster curl http://172.16.78.185:3000/ worker6, PID: 96595%Copy the code
Random allocation is fine, but it’s impossible to see if the number of requests is evenly distributed, so we’re going to simulate concurrent requests
➜ node load-balance.js > server.log
Copy the code
Then analog pressure test with siege, concurrency 50 per second
siege -c 50 http://localhost:3000
Copy the code
HTTP/1.1 200 0.00 secs: 16 bytes ==> GET / ^C Lifting the server siege... Transactions: 16276 hits Availability: 100.00% Elapsed Time: 31.65 SECs Data transferred: 0.25MB Response time: Transaction rate: 514.25 trans/ SEC Throughput: 0.01 MB/ SEC Concurrency: 14.72 Successful transactions: 16276 Failed transactions: 0 Shortest Transaction: 0.18 Shortest Transaction: 0.00Copy the code
It takes 31.65 seconds to send 16,276 requests and process 514.25 requests per second
We can look at the server.log file
You need to download the R language pack tutorial
~ R
> df<-read.table(file="server.log",skip=9,header=FALSE)
> summary(df)
V1
worker9 :1361
worker2 :1359
worker5 :1358
worker1 :1357
worker12:1356
worker11:1354
(Other) :8122
Copy the code
We can see that the requests are allocated to the worker with a similar amount of data. Therefore, the load balancing strategy of cluster should be randomly assigned.
Learn to link to fan logs