preface
I recently used Egg as an underlying framework development project and was curious about the management implementation of its multi-process model, so I learned something and wrote it down. Please spray gently if there are any mistakes in the article
Why do you need multiple processes
With the development of technology, now the server is basically multi – core CPU. However, Node is a single-process, single-threaded language (single-threaded for developers, not really). As we all know, CPU scheduling is done in threads, and given Node’s nature, we can only use one CPU at a time. Not only is this extremely inefficient, but it’s also unacceptable for fault tolerance (an error can crash the entire program). So Node has clusters to help us make the most of the server.
Working principle of Cluster Working principle of cluster is recommended to see this article, here is a simple summary:
- The child process’s port listener will be
hack
Drop, but it’s unified by the masterInternal TCP listening
, so there will not be multiple sub-processes listening to the same port and error phenomenon. The request uniformly passes through the master's internal TCP
, TCP request processing logic, will select a worker process to itSends a NewCONN internal message with the client handle sent with the message
. The first method is the default method on all platforms except Windows. The main process listens for the port, receives new connections and then circulates them to the worker process. Some built-in tricks are used in distribution to prevent worker process task overload. In the second case, the main process creates the listening socket and sends it to the interested worker process, which receives the connection directly.- When the worker process receives a handle,
Create client instances (net.socket) to perform specific business logic
, and then return.
As shown in figure:
provenance
Multiprocess model
Take a look at the process model in the Egg official documentation
+--------+ +-------+
| Master |<-------->| Agent |
+--------+ +-------+
^ ^ ^
/ | \
/ | \
/ | \
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
Copy the code
type | Number of processes | role | The stability of | Whether to run business code |
---|---|---|---|---|
Master | 1 | Process management, message forwarding between processes | Very high | no |
Agent | 1 | Background running work (long connection client) | high | A small amount of |
Worker | Generally, it is the number of CPU cores | Execute business code | general | is |
Basically, it uses the Master as the main thread, starts the Agent as the secretary process to assist the Worker to deal with some public affairs (such as logs), and starts the Worker process to execute the real business code.
Multi-process implementation
Process-related code
Let’s start with the Master, which is considered the top-level process for now (there’s actually a parent process, but more on that later).
/**
* start egg app
* @method Egg#startCluster
* @param {Object} options {@link Master}
* @param {Function} callback start success callback
*/
exports.startCluster = function(options, callback) {
new Master(options).ready(callback);
};
Copy the code
Start with the Master constructor
constructor(options) { super(); // Initialize the parameter this.options = parseOptions(options); // This. WorkerManager = new Manager(); This. Messenger = new messenger (this); // To set a ready event, see the get-ready NPM package ready.mixin(this); This.isproduction = isProduction(); this.agentWorkerIndex = 0; // Whether this. Closed =false; . Ready (() => {// Sets the start state totrue
this.isStarted = true;
const stickyMsg = this.options.sticky ? ' with STICKY MODE! ' : ' ';
this.logger.info('[master] %s started on %s (%sms)%s', frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg); // Send egg-ready to each process and trigger the related event const action ='egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options }); // start check agent and worker status this.workerManager.startCheck(); }); // Register various events this.on('agent-exit', this.onAgentExit.bind(this));
this.on('agent-start', this.onAgentStart.bind(this)); . DetectPort ((err, port) => {... this.forkAgentWorker(); }}); }Copy the code
As you can see, the Master constructor initializes and registers events. Finally, forkAgentWorker runs the forkAgentWorker function.
const agentWorkerFile = path.join(__dirname, 'agent_worker.js'); // Use child_process to execute an Agent const agentWorker = childprocess. Fork (agentWorkerFile, args, opt);Copy the code
Moving on to agent_worker.js, agent_worker instantiates an agent object, and agent_worker.js has a key line of code:
agent.ready(() => {
agent.removeListener('error', startErrorHandler); // Clear the error listening event process.send({action:'agent-start', to: 'master'}); // send an agent-start action to master});Copy the code
As you can see, the code in agent_worker.js sends a message to the master with the action agent-start. Back in the master, you can see that it registered two events. ForkAppWorkers for once and onAgentStart for ON
this.on('agent-start', this.onAgentStart.bind(this));
this.once('agent-start', this.forkAppWorkers.bind(this));
Copy the code
Let’s start with the onAgentStart function, which is relatively simple and just passes some information:
onAgentStart() {
this.agentWorker.status = 'started';
// Send egg-ready when agent is started after launched
if (this.isAllAppWorkerStarted) {
this.messenger.send({ action: 'egg-ready', to: 'agent', data: this.options });
}
this.messenger.send({ action: 'egg-pids', to: 'app', data: [ this.agentWorker.pid ] });
// should send current worker pids when agent restart
if (this.isStarted) {
this.messenger.send({ action: 'egg-pids', to: 'agent', data: this.workerManager.getListeningWorkerIds() });
}
this.messenger.send({ action: 'agent-start', to: 'app' });
this.logger.info('[master] agent_worker#%s:%s started (%sms)',
this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
}
Copy the code
ForkAppWorkers is then executed, which uses the cfork package to fork the worker process and registers a series of related listening events.
. cfork({exec: this.getAppWorkerFile(),
args,
silent: false,
count: this.options.workers,
// don't refork in local env refork: this.isProduction, }); . // trigger app-start event cluster.on('listening', (worker, address) => { this.messenger.send({ action: 'app-start', data: { workerPid: worker.process.pid, address }, to: 'master', from: 'app'}); });Copy the code
As you can see, the forkAppWorkers function triggers the app-start event on the master when Listening for a Listening event.
this.on('app-start', this.onAppStart.bind(this)); . // The master ready callback is triggeredif (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true); } // The ready callback sends egg-ready status to each process const action ='egg-ready';
this.messenger.send({ action, to: 'parent', data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: 'app', data: this.options });
this.messenger.send({ action, to: 'agent', data: this.options });
// start check agent and worker status
if (this.isProduction) {
this.workerManager.startCheck();
}
Copy the code
Summary:
- Master.constructor: Execute the Master constructor first, where a detect function is executed
- Detect: Detect => forkAgentWorker()
- ForkAgentWorker: Gets the Agent process and triggers the Agent-start event for the master
- Execute onAgentStart, forkAppWorker (once)
- OnAgentStart => Sends all kinds of messages. ForkAppWorker => triggers an app-start event to the master
- The app-start event triggers the onAppStart() method
- OnAppStart => Set Ready (true) => Execute the ready callback
- Ready() = > Send egg-ready to each process and trigger related events to execute startCheck()
+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | | +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready | | +-------------------->| | | Egg ready | | +----------------------------------------->|Copy the code
The process to protect
According to the official documentation, process daemons rely primarily on the graceful and Egg-Cluster libraries.
Uncaught exception
- Close all TCP servers of the abnormal Worker process (quickly disconnect the existing connection and stop receiving new connections), disconnect the IPC channel with the Master and stop accepting new user requests.
- The Master immediately forks a new Worker process, keeping the total number of “workers” online unchanged.
- The abnormal Worker waits for a period of time, processes the accepted requests and exits.
+---------+ +---------+ | Worker | | Master | +---------+ +----+----+ | uncaughtException | +------------+ | | | | +---------+ | <----------+ | | Worker | | | +----+----+ | disconnect | fork a new worker | +-------------------------> + ---------------------> | | wait... | | |exit | |
+-------------------------> | |
| | |
die | |
| |
| |
Copy the code
As you can see from the executing app file, app actually inherits from the Application class, which calls graceful() below.
onServer(server) { ...... graceful({ server: [ server ], error: (err, throwErrorCount) => { ...... }}); . }Copy the code
Graceful: It catches process.on(‘uncaughtException’) and closes the TCP connection in the callback function, closing the process and disconnecting the IPC channel from the master.
process.on('uncaughtException'.function(err) { ...... // Set Connection: close Response header Servers.foreach (function (server) {
if (server instanceof http.Server) {
server.on('request'.function (req, res) {
// Let http server set `Connection: close` header, and close the current request socket.
req.shouldKeepAlive = false;
res.shouldKeepAlive = false;
if(! res._header) { res.setHeader('Connection'.'close'); }}); }}); // make sure we close down within 'killTimeout` seconds
var killtimer = setTimeout(function () {
console.error('[%s] [graceful:worker:%s] kill timeout, exit now.', Date(), process.pid);
if(process.env.NODE_ENV ! = ='test'{/ /kill children by SIGKILL before exit
killChildren(function() {// exit the process. Exit (1); }); }},killTimeout);
// But don't keep the process open just for that! // If there is no more io waitting, just let process exit normally. if (typeof killtimer.unref === 'function') {// only worked on node 0.10+ killtimer.unref(); } var worker = options.worker || cluster.worker; // cluster mode if (worker) {try {// disable TCP for (var I = 0; i < servers.length; i++) { var server = servers[i]; server.close(); } } catch (er1) { ...... } try {// close the ICP channel worker.disconnect(); } catch (er2) { ...... }}});Copy the code
Ok, after closing the IPC channel, we continue to look at the cfork file, namely the fork worker package mentioned above, which monitors the disconnect event of the child process. It will judge whether to fork a new child process again according to the conditions
cluster.on('disconnect'.function(worker) { ...... Disconnects [worker.process.pid] = utility. LogDate ();if(allow()) {// fork a new child process newWorker = forkWorker(worker._clusterSettings); newWorker._clusterSettings = worker._clusterSettings; }else{... }});Copy the code
Generally speaking, this time will continue to wait for a while and then execute the above mentioned timing function, namely exit the process.
This type of system exception is sometimes not caught in the child process, we can only handle it in the master, that is, cfork package.
cluster.on('exit'.function(worker, code, signal) {// uncatughException will be used to fork a child process. disconnects[worker.process.pid];if (isExpected) {
delete disconnects[worker.process.pid];
// worker disconnect first, exit expected
return; } // Is the child process killed by the master, no fork requiredif (worker.disableRefork) {
// worker is killed by master
return;
}
if (allow()) {
newWorker = forkWorker(worker._clusterSettings);
newWorker._clusterSettings = worker._clusterSettings;
} else{... } cluster.emit('unexpectedExit', worker, code, signal);
});
Copy the code
Interprocess Communication (IPC)
As mentioned above, you may have noticed that the IPC channel of cluster exists only between Master and Worker/Agent, but not between Worker and Agent. So what should workers do to communicate with each other? Yes, through Master.
Broadcast message: agent => all workers +--------+ +-------+ | Master |<---------| Agent | +--------+ +-------+ / | \ / | \ / | \ / | \ v v v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+ Designated receiver: one worker => another worker +--------+ +-------+ | Master |----------| Agent | +--------+ +-------+ ^ | send to / | worker 2 / | / | / v +----------+ +----------+ +----------+ | Worker 1 | | Worker 2 | | Worker 3 | +----------+ +----------+ +----------+Copy the code
In master, you can see that when the agent and app are forked, it listens for their messages and converts them to an object:
agentWorker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'agent';
this.messenger.send(msg);
});
worker.on('message', msg => {
if (typeof msg === 'string') msg = { action: msg, data: msg };
msg.from = 'app';
this.messenger.send(msg);
});
Copy the code
You can see that the last call is messenger.send, and messengeer. Send is where to send the message based on from and to
send(data) {
if(! data.from) { data.from ='master'; }... // app -> master // agent -> masterif (data.to === 'master') {
debug('%s -> master, data: %j', data.from, data);
// app/agent to master
this.sendToMaster(data);
return;
}
// master -> parent
// app -> parent
// agent -> parent
if (data.to === 'parent') {
debug('%s -> parent, data: %j', data.from, data);
this.sendToParent(data);
return;
}
// parent -> master -> app
// agent -> master -> app
if (data.to === 'app') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAppWorker(data);
return; } // parent -> master -> agent // app -> master -> agentif (data.to === 'agent') {
debug('%s -> %s, data: %j', data.from, data.to, data);
this.sendToAgentWorker(data);
return; }}Copy the code
The master emits the corresponding registration event directly from the action message
sendToMaster(data) {
this.master.emit(data.action, data.data);
}
Copy the code
Agents and workers, on the other hand, use a sendMessage package, which actually calls a similar method below
Send (data) worker.send(data) worker.send(data)Copy the code
Finally, the Messenger class is called on EggApplication, the base class that both Agent and app inherit, with the following constructor inside:
constructor() { super(); . this._onMessage = this._onMessage.bind(this); process.on('message', this._onMessage);
}
_onMessage(message) {
if(message && is.string(message.action)) {// Emit (message.action, message.data); }}Copy the code
To sum up: the idea is to use event mechanisms and IPC channels to communicate between processes.
other
There is a function of timeout.unref() in the learning process, about which we recommend you to refer to the answer on the 6th floor of this question
conclusion
Moving from the front end to the back end was a bit of a struggle, and Egg’s process management implementation was pretty impressive, so a lot of time was spent thinking about various apis and ideas.
Reference and citation
Multi-process model and inter-process communication Egg source code parsing egg-cluster