Before WE talk about RPC, let’s talk about communication protocol design. What is a communication protocol? Simply put, it refers to an agreement between communication parties on the control of data transmission. Communication is not just about passing on information, but about making sure it is accurate and reliable.

For the first byte, we store the version in its lower four bits, and how many bits of data are stored in its higher four bits, consuming exactly one byte. For each subsequent piece of data, it is in the form of

/
, and the length of each piece of data is expressed in 4 bytes, and the length of the data is not fixed.

All multi-byte integers are big endian. The version and argc integers

are stored in the first byte, followed by a sequence of zero or more <length> / <data> pairs, where length is a 32-bit unsigned integer.

      0        1 2 3 4     <length>    ...
+------------+----------+------------+
| <ver/argc> | <length> | <data>     | additional arguments
+------------+----------+------------+
Copy the code

It is important to note that byte high status and high and low address are two different things, so it is up to the design of large endian or small endian data storage question. Of course, for network protocol design, we need to use big endian.

The difference is that Int8 can be represented by one byte, while Short, Int32, and Double can not be represented by one byte, so we need to represent them by multiple bytes. This introduces the concept of “byte order”, which is the order in which bytes are stored. For a value to be represented, whether to store its low order to a low address or its high order to a low address is called Little Endian or Big Endian. The big end and the small end have their own advantages and disadvantages, different CPU vendors are not in agreement, but when it comes to network communication, everyone has to have a common standard, otherwise there will be no communication. TCP/IP protocol RFC1700 specifies the use of “big-endian” bytes as network bytes. Therefore, when developing network communication protocols, we should use the big-endian API (BE) for Buffer operations. RPC (1) — Protocol

Amp is also very simple to use, with encode encoding the data and decode decoding the data.

var bin = amp.encode([Buffer.from('hello'), Buffer.from('world')]);
var msg = amp.decode(bin);
console.log(msg);
Copy the code

Let’s take a look at the encode is how to achieve ~ first of all, we need to confirm, we need a total of how many bytes, we’ve talked to the agreement of the above, so we need a byte storage protocol version number and the number of article data, and each data needs to be 4 bytes to store data length and the length of the data itself need to the number of bytes. For the first byte, we through version < < 4 | arg c, to implement the version in the lower four, four data article number in high.

For each piece of data, we said that the length of the data is first written in 32-bit unsigned big-endian order, and then the data is written. When the loop ends, it’s our seal, no, when our data is finished

/** * Protocol version. */

var version = 1;

/**
 * Encode `msg` and `args`.
 *
 * @param {Array} args
 * @return {Buffer}
 * @api public* /

module.exports = function(args){
  var argc = args.length;
  var len = 1;
  var off = 0;

  // data length
  for (var i = 0; i < argc; i++) {
    len += 4 + args[i].length;
  }

  // buffer
  var buf = Buffer.allocUnsafe(len);

  // pack meta
  buf[off++] = version << 4 | argc;

  // pack args
  for (var i = 0; i < argc; i++) {
    var arg = args[i];

    buf.writeUInt32BE(arg.length, off);
    off += 4;

    arg.copy(buf, off);
    off += arg.length;
  }

  return buf;
};
Copy the code

We will look at how to decode the data decode, whether it is coding or decoding, need to keep in mind is that we in front of the communication protocol structure is what ~

First we fetch the first byte from buf, meta >> 4 retrieves the lower four bits of the version number, meta & 0xf retrieves the data number argv by matching the higher four bits.

Slice (off, off += len) to get the data, and finally return the array of data

/**
 * Decode the given `buf`.
 *
 * @param {Buffer} buf
 * @return {Object}
 * @api public* /

module.exports = function(buf){
  var off = 0;

  // unpack meta
  var meta = buf[off++];
  var version = meta >> 4;
  var argv = meta & 0xf;
  var args = new Array(argv);

  // unpack args
  for (var i = 0; i < argv; i++) {
    var len = buf.readUInt32BE(off);
    off += 4;

    var arg = buf.slice(off, off += len);
    args[i] = arg;
  }

  return args;
};
Copy the code

However, in the actual coding process, the data is transmitted through the stream information transmission, so we need to get the encoded data from the stream data. For example, we send the data pipe sent by the client to parser for processing and get the complete encoded data.

var server = net.createServer(function(sock){
  var parser = new amp.Stream;
  
  parser.on('data'.function(chunk){
    var args = chunk.map(function(c){
      return c.toString();
    });

    var meth = args.shift();
    console.log('.%s(%s)', meth, args.join(', '));
  });

  sock.pipe(parser);
});
Copy the code

So, let’s see how a Stream in AMP is implemented. Stream inherits writable streams and maintains them. State indicates how to deal with chunk. _lenbuf indicates the length of the current data in the protocol ~

State there are three states: message indicates that the protocol is parsed, from which we extract the protocol version and how many pieces of data, arglen indicates the length of the current data, and arg indicates that the current data is parsed.

_nargs is how many pieces of data were retrieved. _leni is a count of the current data length, because the data length is stored in four bytes. _bufs is used to temporarily amp the data. Also set the state to arglen, since the AMP protocol

is followed by the length of the data.

Enter Arglen and take out the low data length at the same time. Because the length of data is stored in four-byte big-endian order, other bytes of the data length may be in the next chunk, so it is not four bytes at a time. When _leni is 4, all four bytes have been retrieved, the length of the data from the temporary _lenbuf big encoda is assigned to _arglen, _argcur is used to count whether the single data has been read, and the state is set to arg. It means the next stage is to get the data.

If chunk still has data, go to the arG branch and first fetch the remaining bytes of the current data this._arglen – this._argcur. Again, as with Arglen, the data may remain in the next chunk. Math.min(rem + I, chunk.length), and stuff the cut data into _bufs array. If the current data runs out, this._argcur == this._arglen is valid. If _NARgs is equal to the number of bytes in the amp header, the state is set to state. When the next AMP is read, data listening is triggered, and the data of the current AMP is passed out. If done is true, the current data is read. Reset the data length counter _leni and state to arglen to read the length of the next data. If the done condition is not met, the current data is stored in the next chunk and the remaining bytes of the current data are expected to be read in the next chunk.

Finally, when chunk finishes reading, fn callback ~ is triggered

/** * Module dependencies. */

var Stream = require('stream').Writable;
var encode = require('./encode');

/** * Expose parser. */

module.exports = Parser;

/**
 * Initialize parser.
 *
 * @param {Options} [opts]
 * @api public* /

function Parser(opts) {
  Stream.call(this, opts);
  this.state = 'message';
  this._lenbuf = Buffer.allocUnsafe(4);
}

/** * Inherit from `Stream.prototype`. */

Parser.prototype.__proto__ = Stream.prototype;

/** * Write implementation. */

Parser.prototype._write = function(chunk, encoding, fn){
  for (var i = 0; i < chunk.length; i++) {
    switch (this.state) {
      case 'message':
        var meta = chunk[i];
        this.version = meta >> 4;
        this.argv = meta & 0xf;
        this.state = 'arglen';
        this._bufs = [Buffer.from([meta])];
        this._nargs = 0;
        this._leni = 0;
        break;

      case 'arglen':
        this._lenbuf[this._leni++] = chunk[i];

        // done
        if (4= =this._leni) {
          this._arglen = this._lenbuf.readUInt32BE(0);
          var buf = Buffer.allocUnsafe(4);
          buf[0] = this._lenbuf[0];
          buf[1] = this._lenbuf[1];
          buf[2] = this._lenbuf[2];
          buf[3] = this._lenbuf[3];
          this._bufs.push(buf);
          this._argcur = 0;
          this.state = 'arg';
        }
        break;

      case 'arg':
        // bytes remaining in the argument
        var rem = this._arglen - this._argcur;

        // consume the chunk we need to complete
        // the argument, or the remainder of the
        // chunk if it's not mixed-boundary
        var pos = Math.min(rem + i, chunk.length);

        // slice arg chunk
        var part = chunk.slice(i, pos);
        this._bufs.push(part);

        // check if we have the complete arg
        this._argcur += pos - i;
        var done = this._argcur == this._arglen;
        i = pos - 1;

        if (done) this._nargs++;

        // no more args
        if (this._nargs == this.argv) {
          this.state = 'message';
          this.emit('data', Buffer.concat(this._bufs));
          break;
        }

        if (done) {
          this.state = 'arglen';
          this._leni = 0;
        }
        break;
    }
  }


  fn();
};
Copy the code

The above is all the details of amp protocol design, hope to have a further understanding of the details of the protocol design ~