background

Node officially provides the Cluster module to provide multi-process solutions to maximize server resource utilization.

In general, there are three sub-problems to be solved in this problem domain

  • Restart the mechanism
  • Load balancing
  • State sharing, the communication mechanism

Simple Node

As an enterprise-level framework, Egg also provides some enhancements to address these issues by providing the Egg-Cluster module

Communication mechanism

Why talk about communications first? You need to start the process.

It can be seen that the main entities include Master, Agent and Worker. In fact, Master, Agent and Worker are mainly responsible for the execution of communication, while Manager and Messenger are the ones who really provide communication management ability.

Before we look at these modules, we can look at the basics

Messenger

Messenger is a message sender responsible for: receiving messages -> directed forwarding.

So why do this module alone?

  • Unified protocol format: Both agents and workers have exit and Message events
  • Unified communication methods: Process. send is used for communication with Parent, sendMessage module is used for communication with Worker/Agent, and EventEmitter is used for communication with Master

It consists of two parts:

  • Information collection
  • Routing forwarding

Take an example of a worker starting.

First, information collection, which uses a subscription/notification mode, is handled by calling Messenger explicitly by master.

// After the cluster is started, the parent process is notified that it started successfully
const action = 'egg-ready';
this.messenger.send({
  action,
  to: 'parent'.data: {
    port: this[REAL_PORT],
    address: this[APP_ADDRESS],
    protocol: this[PROTOCOL],
  },
});
Copy the code

Then, send is used for directional forwarding, which consists of two parts

// Route identification
if (data.to === 'parent') {
  this.sendToParent(data);
  return;
}

// Call the specified method
sendToParent(data) {
  if (!this.hasParent) {
    return;
  }
  process.send(data);
}
Copy the code

For more information, see this document: The Messenger module

Manager

The Manager module is relatively simple and mainly provides management operations for agents and workers. It is worth mentioning its survival check code

// Agent. Status is modified in master onAgentStart
count() {
  return {
    agent: (this.agent && this.agent.status === 'started')?1 : 0.worker: this.listWorkerIds().length,
  };
}

startCheck() {
  this.exception = 0;
  // Check every 10 seconds
  this.timer = setInterval((a)= > {
    const count = this.count();
    if (count.agent && count.worker) {
      this.exception = 0;
      return;
    }
    // If the agent and worker do not meet the requirements, an exception will be triggered after more than three times, and the master will exit after receiving the message
    this.exception++;
    if (this.exception >= 3) {
      this.emit('exception', count);
      clearInterval(this.timer); }},10000);
}
Copy the code

See the documentation: Manager

Start the process

Start with NPM run dev

Start with the startup process and see what happens with the NPM run dev command.

It actually executes the run method of the egg-bin lib/ CMD /dev.js file

/ / lib/CMD/dev. Js file
constructor(rawArgv) {
  // Omit other initialization code
  this.serverBin = path.join(__dirname, '.. /start-cluster');
}

* run(context) {
  // Omit the parameter formatting process
  yield this.helper.forkNode(this.serverBin, devArgs, options);
}

// start cluster.js file to execute startCluster of the framework
require(options.framework).startCluster(options);
Copy the code

If the framework is an Egg, this code of the Egg will be executed at the end

exports.startCluster = require('egg-cluster').startCluster;
Copy the code

So the index.js of the egg-cluster module is executed

exports.startCluster = function(options, callback) {
  new Master(options).ready(callback);
};
Copy the code

The following process is not difficult, but the content is very detailed, you can see the startup and exit analysis, mainly describes how to implement the following process

+---------+ +---------+ +---------+ | Master | | Agent | | Worker | +---------+ +----+----+ +----+----+ | fork agent | |  +-------------------->| | | agent ready | | |<--------------------+ | | | fork worker | +----------------------------------------->| | worker ready | | |<-----------------------------------------+ | Egg ready  | | +-------------------->| | | Egg ready | | +----------------------------------------->|Copy the code

The Agent is smoothly restarted

First, the callback is registered when the Agent is started

forkAgentWorker(){
  / / get the agent
  const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
  // Listen for the exit event and forward it to the master
  agentWorker.once('exit', (code, signal) => {
    this.messenger.send({
      action: 'agent-exit'.data: {
        code,
        signal,
      },
      to: 'master'.from: 'agent'}); }); }constructor() {this.on('agent-exit'.this.onAgentExit.bind(this));
}
Copy the code

The restart logic is then handled in onAgentExit

onAgentExit(data) {
  if (this.closed) return;
  // Clean up
  const agentWorker = this.agentWorker;
  this.workerManager.deleteAgent(this.agentWorker);
  agentWorker.removeAllListeners();

  // If it has been started, it will restart automatically
  if (this.isStarted) {
    setTimeout((a)= > {
      this.forkAgentWorker();
    }, 1000);
 
		// omit a piece of code that forwards messages to parent
  } else {
    process.exit(1); }}Copy the code

The isStarted flag, which records whether the whole thing started successfully, is assigned in the ready callback

// This ready is provided by the get-ready module to solve the registration problem of asynchronous tasks, so that asynchronous tasks can be added freely before starting
this.ready((a)= > {
	this.isStarted = true;
});
Copy the code

The Worker’s smooth restart

The smooth restart of Worker is mainly done by the Cfork module, and the listening of exit event in egg-cluster is just forwarding.

The general idea is to listen for exit and Disconnect events through the Cluster module, and then determine whether to restart based on the disableRefork configuration, which handles some restart logic

cluster.on('disconnect'.function (worker) {
  / / API reference: https://nodejs.org/api/cluster.html#cluster_worker_isdead
  var isDead = worker.isDead && worker.isDead();
  if (isDead) {
    // worker has terminated before disconnect
    return;
  }
  // The configuration will not continue until it is restarted
  if (worker.disableRefork) {
    return;
  }
	
  // disconnect saves the process that has lost contact, which is used later
  disconnects[worker.process.pid] = utility.logDate();
 
  // Restart logic
  if (allow()) {
    newWorker = forkWorker(worker._clusterSettings);
    newWorker._clusterSettings = worker._clusterSettings;
  } else {
  	/ / to omit}}); cluster.on('exit'.function (worker, code, signal) {
  varisExpected = !! disconnects[worker.process.pid];// If the Disconnect event has already been responded to, there is no need to go through the subsequent exit process
  if (isExpected) {
    delete disconnects[worker.process.pid];
    // worker disconnect first, exit expected
    return;
  }

  // Similar judgment disableRefork logic, omitted

  unexpectedCount++;
	
  // Similar restart logic, omitted
  
  cluster.emit('unexpectedExit', worker, code, signal);
});
Copy the code

Sticky Mode for load balancing

Background: The earliest Session and other state information are stored in Worker memory, so once users make multiple requests to different workers, login state failure will inevitably occur.

Solution: The Sticky Mode aims to solve this problem by ensuring that the requests of the same user reach the same Worker in a certain way

:::info The readme. md file in egg-bin does not start by default, but for fun. : : :

Forward implementation

First, if sticky mode is enabled, a stickyWorkerPort will be allocated to the master

// master.js 
detectPorts() {
  return GetFreePort()
		// Omit the middle section of the code setting the primary port
    .then(port= > {
    if (this.options.sticky) {
      this.options.stickyWorkerPort = port; }})}Copy the code

At the same time, an internal net.Server will be started to forward messages to workers

if (this.options.sticky) {
  this.startMasterSocketServer(err= > {
 		/ / to omit
  });
}

startMasterSocketServer(cb) {
  
  // Internal net server
  require('net').createServer({
    pauseOnConnect: true,
  }, connection => {
     TCP_reset_attack TCP_reset_attack TCP_reset_attack
    if(! connection.remoteAddress) { connection.destroy(); }else {
      // Select a worker
      const worker = this.stickyWorker(connection.remoteAddress);
      worker.send('sticky-session:connection', connection);
    }
  }).listen(this[REAL_PORT], cb);
}
Copy the code

:::info: Why does listen listen not listen repeatedly? My understanding is that, according to the introduction of this article, the Master will start the internal TCP service only when it first passes the Socket to the Worker, which is later than startMasterSocketServer. : : :

In workers, if sticky is configured, stickyWorkerPort is used to monitor and only the sticky-session: Connection messages forwarded by the parent process (that is, the master process) are monitored

if (options.sticky) {
  server.listen(options.stickyWorkerPort, '127.0.0.1');
  
  process.on('message', (message, connection) => {
    if(message ! = ='sticky-session:connection') {
      return;
    }

    server.emit('connection', connection);
    connection.resume();
  });
}
// omit normal listening code
Copy the code

There is a detail here. How do you ensure that data is not lost in the forwarding process?

::: Info why is lost? Because net.Socket was a Duplex Stream object that automatically read data under Flowing Mode, the data was lost if it did not respond to a data event.

Enable the pauseOnConnect option when creating the socket.

If pauseOnConnect is set to true, then the socket associated with each incoming connection will be paused, and no data will be read from its handle.

Second, resume when receiving the socket

Forwarding strategy

Forwarding is implemented through the stickyWorker function, which essentially takes the remainder of the number of workers from remoteAddress as an index to randomly select a Worker from the list of workers

stickyWorker(ip) {
    const workerNumbers = this.options.workers;
    // ws is a list of Pids
    const ws = this.workerManager.listWorkerIds();

    let s = ' ';
    // IP processing: 127.0.0.1 -> 127001
    for (let i = 0; i < ip.length; i++) {
      
      // This filter can filter out letters and symbols so that both IPv4 and IPv6 are compatible
      if (!isNaN(ip[i])) {
        s += ip[i];
      }
    }
    s = Number(s);
    / / modulo
    const pid = ws[s % workerNumbers];
    return this.workerManager.getWorker(pid);
  }

Copy the code

Background See Problems caused by Issue: Sticky Mode

Other interesting little discoveries

Different startup modes of Agent and worker

If you go back to the code, you can find that agent is started by child_process.fork and worker is started by cluster. If you need to provide some local services similar to the management page, it is generally done by the Agent, so the Agent must have the ability to independently listen to the port

How do I detect if a process really exits

Force process.exit on the process and wrap it in a try-catch. If an error is reported, it is true that the process has exited.

Usage scenario: Use SIGTERM to exit (compared with kill -1) at the beginning, and then use SIGKILL to force exit (similar to kill -9) after a certain amount of time has passed.

function getUnterminatedProcesses(pids) {
  return pids.filter(pid= > {
    try {
      // success means it's still alive
      process.kill(pid, 0);
      return true;
    } catch (err) {
      // error means it's dead
      return false; }}); }Copy the code

summary

EggCluster deals with the big picture of the knowledge system