Using the example

To enable Node applications to improve performance on multi-core servers, Node provides a Cluster API for creating multiple worker processes that can then process requests in parallel.

// master.js
const cluster = require('cluster');
const cpusLen = require('os').cpus().length;
const path = require('path');

console.log('Main process:${process.pid}`);
cluster.setupMaster({
  exec: path.resolve(__dirname, './work.js')});for (let i = 0; i < cpusLen; i++) {
  cluster.fork();
}

// work.js
const http = require('http');

console.log('Working process:${process.pid}`);
http.createServer((req, res) = > {
  res.end('hello');
}).listen(8080);
Copy the code

In the example above, cluster is used to create multiple worker processes, which can share port 8080. We request localhost:8080, and one of the worker processes will handle the request. After processing, the worker process will respond to the request itself.

Port Occupation problem

The socket socket has a file descriptor, and each process has a different file descriptor, so it cannot be used for multiple processes to listen on the same port.

// master.js
const fork = require('child_process').fork;
const cpusLen = require('os').cpus().length;
const path = require('path');

console.log('Main process:${process.pid}`);
for (let i = 0; i < cpusLen; i++) {
  fork(path.resolve(__dirname, './work.js'));
}

// work.js
const http = require('http');

console.log('Working process:${process.pid}`);
http.createServer((req, res) = > {
  res.end('hello');
}).listen(8080);
Copy the code

Error: Listen EADDRINUSE: Address already in use :::8080 is reported when master.js is running.

We modify to use only the master process listening port, the master process will issue the request socket to the worker process, and the worker process will handle the business.

// master.js
const fork = require('child_process').fork;
const cpusLen = require('os').cpus().length;
const path = require('path');
const net = require('net');
const server = net.createServer();

console.log('Main process:${process.pid}`);
const works = [];
let current = 0
for (let i = 0; i < cpusLen; i++) {
  works.push(fork(path.resolve(__dirname, './work.js')));
}

server.listen(8080.() = > {
  if (current > works.length - 1) current = 0
  works[current++].send('server', server);
  server.close();
});

// work.js
const http = require('http');
const server = http.createServer((req, res) = > {
  res.end('hello');
});

console.log('Working process:${process.pid}`);
process.on('message'.(type, tcp) = > {
  if (type === 'server') {
    tcp.on('connection'.socket= > {
      server.emit('connection', socket) }); }})Copy the code

In fact, the cluster-created worker process does not actually listen to the port. The NET Server listen function in the worker process will be hacked, and the worker process will call listen without any effect. The work of listening on the port is assigned to the master process, and the worker process corresponding to the port is bound to the master process. When the request comes in, the master process will send the socket of the request to the corresponding worker process, and the worker process will process the request.

Let’s take a look at the implementation of the Cluster API and see how the following two functions are done inside the Cluster:

  • Main process: listens on incoming ports
  • Work process:
    • The master process registers the current worker process. If the master process is listening on this port for the first time, it creates a TCP server and binds the current worker process to the TCP server.
    • Hack removes the listen function from the worker process so that the process cannot listen on the port

The source code interpretation

For this article, [email protected] was used.

// lib/cluster.js
'use strict';

const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary';
module.exports = require(`internal/cluster/${childOrPrimary}`);
Copy the code

This is the cluster API entry. When referring to the cluster, the application first checks whether the NODE_UNIQUE_ID variable exists in the environment variable to determine whether the application is currently running in the main process or in the worker process. NODE_UNIQUE_ID is actually an incremented number, which is the ID of the worker process, as you will see later in the code for creating the worker process, which is not explained here.

By the previous code we know, if in the main references in the process of cluster, the exported process is internal/cluster/primary js this file, so we’ll take a look at this file inside some of the implementation.

// internal/cluster/primary.js
// ...
const EventEmitter = require('events');
const cluster = new EventEmitter();
// The following three parameters will be used when implementing node internal functions, and will be used later when we look at the NET source code
cluster.isWorker = false; // Whether it is a working process
cluster.isMaster = true; // Is the main process
cluster.isPrimary = true; // Is the main process

module.exports = cluster;

cluster.setupPrimary = function(options) {
  const settings = {
    args: ArrayPrototypeSlice(process.argv, 2),
    exec: process.argv[1].execArgv: process.execArgv,
    silent: false. cluster.settings, ... options }; cluster.settings = settings;// ...
}

cluster.setupMaster = cluster.setupPrimary;

cluster.fork = function(env) {
  cluster.setupPrimary();
  const id = ++ids;
  const workerProcess = createWorkerProcess(id, env);
}

const { fork } = require('child_process');
function createWorkerProcess(id, env) {
  // NODE_UNIQUE_ID is used by the entry file to identify the current process type
  constworkerEnv = { ... process.env, ... env,NODE_UNIQUE_ID: `${id}` };
  // ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    env: workerEnv,
    // ...
  });
}
Copy the code

Cluster.fork creates a new worker process using the fork function in child_process. By default, the new process will run the entry file (process.argv[1]) executed on the command line. We can also execute luster.setupPrimary or cluster.setupMaster and pass in the exec parameter to modify the file executed by the worker process.

We’ll simply look at the internal of the reference work process/cluster/child. The js files:

// internal/cluster/child.js
const EventEmitter = require('events');
const cluster = new EventEmitter();

module.exports = cluster;
// This is a worker process. The parameters will be used later
cluster.isWorker = true;
cluster.isMaster = false;
cluster.isPrimary = false;

cluster._getServer = function(obj, options, cb) {
  // ...
};
// ...
Copy the code

The main thing to remember here is that the cluster in the worker process has a _getServer function, and the process will look at the code in detail when it goes to this function.

Net Server listen

// lib/net.js
Server.prototype.listen = function(. args) {
  // ...
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    // If you pass in a port directly to the original call to listen, it will go directly to else, and we will also look at the logic in else
    if (options.host) {
      // ...
    } else  {
      // listen(8080, () => {... }) will run the branch
      listenInCluster(this.null, options.port | 0.4, backlog, undefined, options.exclusive);
    }
    return this;
  }
  // ...
}

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  // ...
  IsPrimary is true if the cluster is in the main process and false if the cluster is in the main process. The main process will directly execute the server._listen2 function, and the worker process will also execute the function after the server.
  if (cluster.isPrimary || exclusive) {
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  // The following code will be executed only in the worker process
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  / / perform here is internal/cluster/child. The cluster of js. _getServer, will be introduced to listenOnPrimaryHandle the callback function at the same time, the callback function will be added in the process of the main port to monitor, At the same time, the work process will be bound to the corresponding TCP service will be executed, the work is to hack the net Server LISTEN and other functions.
  cluster._getServer(server, serverQuery, listenOnPrimaryHandle);

  function listenOnPrimaryHandle(err, handle) {
    // ...server._handle = handle; server._listen2(address, port, addressType, backlog, fd, flags); }}// Wait for the worker process to execute this function
Server.prototype._listen2 = setupListenHandle;
function setupListenHandle(.) {
  // ...
}
Copy the code

As you can see from the above code, net Server Listen executed in both the main and worker processes goes into a setupListenHandle function. The main process executes the setupListenHandle function directly, whereas the worker process executes cluster._getServer, listens to the worker process port, hacks the listen function, and then executes setupListenHandle. Let’s take a look at the internal implementation of the cluster._getServer function.

// lib/internal/cluster/child.js
cluster._getServer = function(obj, options, cb) {
  // ...
  // This is the content of the first internal message sent by the worker process.
  // Note that the act value is queryServer
  const message = {
    act: 'queryServer',
    index,
    data: null. options };// ...
  // The send function internally uses the IPC channel to send internal messages to worker processes. When the main process creates a worker process using cluster.fork, it has the worker process listen for internal message events, as shown below
  / / send to the callback function will be written to the lib/internal/cluster/utils. Js file callbacks in the map, such as the back to be used, then extracted.
  send(message, (reply, handle) = > {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, index, cb);
    else
      // This function defines a listen function, which is used to hack NET Server listen
      rr(reply, indexesKey, index, cb);
  });
  // ...
}

function send(message, cb) {
  return sendHelper(process, message, null, cb);
}
Copy the code
// lib/internal/cluster/utils.js
// ...
const callbacks = new SafeMap();
let seq = 0;
function sendHelper(proc, message, handle, cb) {
  message = { cmd: 'NODE_CLUSTER'. message, seq };if (typeof cb === 'function')
    // The callback function passed in is logged here.
    // Note that key is an incrementing number
    callbacks.set(seq, cb);

  seq += 1;
  // Use the IPC channel to send internal messages to the current worker process
  return proc.send(message, handle);
}
// ...
Copy the code

The cluster._getServer function executes in the worker process, generates a callback function, stores the callback function, and sends internal messages to the current worker process using the IPC channel. The main process registers internalMessage events in the worker process when it executes cluster.fork to generate the worker process. Let’s take a look at the code in cluster.fork that registers internal message events with the worker process.

// internal/cluster/primary.js
cluster.fork = function(env) {
  // ...
  // The internal function execution returns a callback that receives the message object.
  / / we can see the lib/internal/cluster/utils. The internal function of js, understand the internal work
  worker.process.on('internalMessage', internal(worker, onmessage));
  // ...
}

const methodMessageMapping = {
  close,
  exitedAfterDisconnect,
  listening,
  online,
  queryServer,
};

// The first callback that triggers internalMessage execution is this function.
// The act of message is queryServer
function onmessage(message, handle) {
  This function execution context is bound to work when onMessage is executed
  const worker = this;

  // Incoming from the worker process
  const fn = methodMessageMapping[message.act];

  if (typeof fn === 'function')
    fn(worker, message);
}

function queryServer(worker, message) {
  // ...
}
Copy the code
// lib/internal/cluster/utils.js
// ...
const callbacks = new SafeMap();

function internal(worker, cb) {
  return function onInternalMessage(message, handle) {
    let fn = cb;

    // This is the first time that a worker sends an internal message: ack (undefined), callback (undefined), and executes the onMessage function passed in by the internal call. The message function is only used to parse the message and actually executes the queryServer function
    The main queryServer function sends the internal message with the worker process and adds the ACK parameter to message, making message.ack=message.seq
    if(message.ack ! = =undefined) {
      const callback = callbacks.get(message.ack);

      if(callback ! = =undefined) {
        fn = callback;
        callbacks.delete(message.ack);
      }
    }

    ReflectApply(fn, worker, arguments);
  };
}
Copy the code

When a worker process sends an internal message for the first time, it cannot retrieve the cluster._getServer callback function that calls send because message.ack is undefind. So had to implement internal/cluster/primary queryServer function in js. Next, take a look at the internal logic of the queryServer function.

// internal/cluster/primary.js
// The TCP server is stored in hadles.
// The main process generates a new TCP server before replacing the worker process listening port.
// Check whether the server is created, if so, directly reuse the previous server, and then bind the worker process to the corresponding server; If not, create a new TCP server and bind the worker process to the new server.
function queryServer(worker, message) {
  // Key is the unique identifier of the server
  const key = `${message.address}:${message.port}:${message.addressType}: ` +
              `${message.fd}:${message.index}`;
  // Check to see if there is a currently needed server from the existing server
  let handle = handles.get(key);
  // If no server is available, create a new one
  if (handle === undefined) {
    // ...
    // In the RoundRobinHandle constructor, a new TCP server is created
    let constructor = RoundRobinHandle;
    handle = new constructor(key, address, message);
    // Store the server
    handles.set(key, handle);
  }

  if(! handle.data) handle.data = message.data;// The RoundRobinHandle constructor can be used to build RoundRobinHandle
  handle.add(worker, (errno, reply, handle) = > {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);

    // A second internal message is sent to the worker process.
    // Only worker and message are passed, handle and CB are not passed
    send(worker, {
      errno,
      key,
      ack: message.seq, // Note that the ack attribute is added heredata, ... reply }, handle); }); }function send(worker, message, handle, cb) {
  return sendHelper(worker.process, message, handle, cb);
}
Copy the code
// internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
  // ...
  this.server = net.createServer(assert.fail);
  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);

  // This callback is executed when the service is listening.
  this.server.once('listening'.() = > {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) = > this.distribute(err, handle);
    this.server._handle = null;
    // Note: If the listener succeeds, the server will be deleted
    this.server = null;
  });
}

RoundRobinHandle.prototype.add = function(worker, send) {
  const done = () = > {
    if (this.handle.getsockname) {
      // ...
      send(null, { sockname: out }, null);
    } else {
      send(null.null.null);  // UNIX socket.
    }
    // ...
  };

  // If the server is already in listening state before the add execution, this.server will be null
  if (this.server === null)
    return done();
  // If the server is in listening after the add is executed, it will go there and always execute the callback passed in during the add call
  this.server.once('listening', done);
}
Copy the code

In this step, the master process generates or obtains an available TCP server for the worker process and binds the worker process to the corresponding server (facilitating subsequent requests for task assignment). When after the completion of the work process of binding, it is sent to work in the process of the second inside information, then we again into the lib/internal/cluster/utils. Js see internal processes:

// lib/internal/cluster/utils.js
const callbacks = new SafeMap();

function internal(worker, cb) {
  // Handle is undefined
  return function onInternalMessage(message, handle) {
    let fn = cb;

    Message. ack has been assigned to message.seq the second time a worker internal message executes
    Lib /cluster.child.js cluster._getServer is a callback that calls the send write function
    if(message.ack ! = =undefined) {
      const callback = callbacks.get(message.ack);

      if(callback ! = =undefined) {
        fn = callback;
        callbacks.delete(message.ack);
      }
    }

    ReflectApply(fn, worker, arguments);
  };
}
Copy the code

When the worker process receives an internal message for the second time, cluster._getServer will execute the send callback.

// lib/internal/cluster/child.js
send(message, (reply, handle) = > {
  // If handle is undefined, the process will run the RR function directly
  if (handle)
    shared(reply, handle, indexesKey, index, cb); 
  else
    ListenOnPrimaryHandle is used by lib/net.js on cluster._getServer.
    rr(reply, indexesKey, index, cb);
});

function rr(message, indexesKey, index, cb) {
  let key = message.key;

  Listen is defined here for hack net server.listen. When listen is executed in a worker process, the worker process does not actually listen on the port
  function listen(backlog) {
    return 0;
  }

  function close() {... }function getsockname(out) {... }const handle = { close, listen, ref: noop, unref: noop };
  handles.set(key, handle);
  // Executes the incoming listenOnPrimaryHandle function
  cb(0, handle);
}
Copy the code

After the RR function is executed, several functions with the same name as those in the NET Server are created and the listenOnPrimaryHandle function is passed in through handle.

// lib/net.js
function listenInCluster(.) {
  cluster._getServer(server, serverQuery, listenOnPrimaryHandle);

  // listenOnPrimaryHandle replaces the server._handle generated by the worker process with a customized handle. The listen function in server. _Handle is executed by server LISTEN. This completes the listen hack for the worker process
  function listenOnPrimaryHandle(err, handle) {
    // ...
    // Handle: {listen:... , close: .... . }server._handle = handle; server._listen2(address, port, addressType, backlog, fd, flags); }}Copy the code

Take a look at the server._listen2 function execution

Server.prototype._listen2 = setupListenHandle;

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  // Ignore that this._handle is the content of the self-defined object as long as it comes in from the worker process
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
    // The main process will enter this layer of logic and will generate a server here
    // ...
    rval = createServerHandle(address, port, addressType, fd, flags);
    // ...
    this._handle = rval;
  }
  const err = this._handle.listen(backlog || 511);
  // ...
}
Copy the code

Now that you’ve read the source code for worker process port listening, you can almost see that when net Server Listen is executed in a worker process, the worker process does not actually listen on the port. Port listening is always left to the main process. When the master process receives the port listening from the worker process, it will first determine whether there is the same server. If so, it will directly bind the worker process to the corresponding server, so that the port will not be occupied. If no corresponding server exists, a new service is generated. When the main process receives a request, it assigns the task to the worker process, depending on which load balancing is used.