Recently, while learning NodeJS, there has been some confusion when learning readable for streams. When will readable be triggered, when will data reading continue, and when will reading stop?

There are two types of events in a readable stream that read the data stream

  • Reading data from a readable stream is like turning on the faucet completely and running water in a highWaterMark sequence until the entire file has been read. The faucet can be turned off midway (readable.pause()) and turned back on after it is closed (readable.resume()). A little chestnut:

    • Create a 1.txt file in the current directory with the content: 1234567890
    • Then create a JS file to read 1.txt as a data event in a readable stream. The code is as follows:
    let fs = require('fs');
    let path = require('path');
    
    let rs = fs.createReadStream(path.join(__dirname, '1.txt'),{
        highWaterMark: 3
    });
    rs.on('data'.function(data){
        console.log(data);
    })
    Copy the code

    The highWaterMark file reads the entire contents of the file three at a time, and the result is as follows:

    <Buffer 31 32 33>
    <Buffer 34 35 36>
    <Buffer 37 38 39>
    <Buffer 30>
    Copy the code
  • Readable Readable data can be read at a random rate, not limited to highWaterMark. Readable can be likened to drawing water from a pool; it has problems with adding water and when to draw water. Readable will read data in the same way as Data in two cases: when rs.read() does not specify size, and when the size specified is the same as highWaterMark. The reason the results are the same also has to do with the rules read.

    Execute the same chestnut as data:

    let fs = require('fs');
    let path = require('path');
    
    let rs = fs.createReadStream(path.join(__dirname, '1.txt'),{
        highWaterMark: 3
    });
    rs.on('readable'.function(data){
        console.log(rs.read());
    })
    Copy the code

    Execution Result:

    <Buffer 31 32 33>
    <Buffer 34 35 36>
    <Buffer 37 38 39>
    <Buffer 30>
    null
    Copy the code

    Why is there a null at the end? If rs.read() specifies size, where will null be printed? Let’s move on with this question.

Readable reading time

  • When is data read from a file into the readable stream internal cache?

    1. The highWaterMark (default 64KB, optionally specified) byte of data is read into the internal cache.
    2. After readable, if the number of bytes in the internal cache is less than highWaterMark, the highWaterMark data will be read into the internal cache.
    3. If there is no more data in the internal cache after readable, the highWaterMark bytes will be read into the internal cache. This is a special case of 2.
    4. When readable fires, if there is not enough data in the cache to read, the nearest power of 2 bytes will be read and put into the cache. For example, this time the readable will read 5 bytes, but the cache has fewer than 5 bytes, 2^3 bytes will be read into the cache.
  • When will readable be called to fetch data from the internal cache?

    1. A readable event is emitted when the number of bytes in the internal cache of a readable stream is zero, either at the start of execution or when it is cleared at the end of a subsequent read.
    2. If there is not enough data in the current cache when readable is executed, the readable event will be emitted again after the data has been read from the file into the cache (item 4 for file reading timing).
    3. When the bottom of the readable stream is reached, that is, the file has all been read into the cache, and the Readable event has been called once, the readable event will be emitted again. (This is a check to clear the cache, which is why the above example ended up printing null)
  • A few simple examples 1: The second case, similar to the data output, specifies the size of rs.read as highWaterMark:

    let fs = require('fs');
    let path = require('path');
    
    let rs = fs.createReadStream(path.join(__dirname, '1.txt'),{
        highWaterMark: 3
    });
    rs.on('readable'.function(data){ console.log(rs.read(3)); // The number of bytes left in the internal cache after each read console.log(rs._readablestate.length)}Copy the code

    Output result:

    <Buffer 31 32 33>
    0
    <Buffer 34 35 36>
    0
    <Buffer 37 38 39>
    0
    null
    1
    <Buffer 30>
    0
    Copy the code

    After the first output of

    , bytes in the cache are cleared. The system will read data from the file again (corresponding to the third case of reading the file). After reading the file, a readable event will be emitted (corresponding to the readableTrigger condition 1). Therefore, there will be the second and third active output

    ,

    ; After the third output, it will actively read the contents of the file, reading the last byte. At this point, it will call readable again and find that there are not enough bytes in the cache to read (corresponding to the fourth case of reading the file). Rs.read will return null and the file data will be read again. Readableis then emitted again (corresponding to readableTrigger Condition 2), once having the last output

    ;



    Example 2: A readable event will be emitted automatically when the bottom of the stream is read:

    let fs = require('fs');
    let path = require('path');
    // let ReadStream = require('./chat');
    let rs = fs.createReadStream(path.join(__dirname, '1.txt'),{
        highWaterMark: 7
    });
    rs.on('readable'.function(data){// The number of bytes remaining in the internal cache before each read console.log(rs._readablestate.length) console.log(rs.read(8)); // The number of bytes left in the internal cache after each read console.log(rs._readablestate.length)}Copy the code

    Output result:

    7
    null
    7
    10
    <Buffer 31 32 33 34 35 36 37 38>
    2
    2
    <Buffer 39 30>
    0
    Copy the code

    Analysis of the results: The highWaterMark data will be put into the internal cache when the program starts execution (corresponding to the first case of reading the file), and readable will be emitted when the read is complete (corresponding to readableTrigger condition 1). The read method returns NULL and reads 2^3 bytes of the file (corresponding to the fourth case of reading the file). After reading, the number of bytes in the cache is 10, and all bytes in the file have been read. Once again, call readable (corresponding to readable34 35 36 37 38>), and output

    . Once the readablemethod has reached the bottom of the stream (corresponding to The ReadableTrigger conditions in Article 3), the readabletime will be triggered to read the entire contents of the cache, output

    , and the cache length will be zero

Emulating readable

I might be a little confused by the example above, because it’s a little abstract. This gives you a simple implementation of readable using fs.read.

let fs = require('fs');
let EventEmmitter = require('events');
function computeNewHighWaterMark(n) {
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
    return n;
}
class ReadStream extends EventEmmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.encoding = options.encoding || 'utf8'; this.highWaterMark = options.highWaterMark || 64 * 1024; this.start = options.start || 0; This. readableLength = 0; // buffers = []; / / when you need to read, to judge whether is being read, if you are reading, don't go to read this. Reading =false; // Whether the readable event this.emittedreadable = needs to be emittedfalse; This.pos = this.start; This.filesize = fs.statsync (path).size; this.filesize = fs.statsync (path).size; // Open the file before reading, note the asynchronous this.open(); // Determine whether the readable event is being listened for, and if so, start reading the file, reading highWaterMark bytes for the first time this.on('newListener', (type) = > {if (type= = ='readable') { this.read(); }})} // herereadEquivalent to rs.read([size]), where n represents the number of bytes to be readread(n) {        
        if(this.filesize === this.pos && this.islast){// If the file was last read, the remaining contents of the internal cache are returned directlyreturnthis.buffers.shift(); } // If the length of the content to read is greater than the length in the cache, set highWaterMark to the nearest power of 2, and trigger againreadThe eventif (n > this.readableLength) {
            this.highWaterMark = computeNewHighWaterMark(n);
            this.emittedReadable = true; // Ensure that no reading is in progressif(! this.reading) { this.reading =true; this._read(); }} // If n is defined, the contents to be read are returned in curReadBuf, i.e. curReadBuf is the result of readingletcurReadBuf; // If there is one in the internal cache, it is fetched and put into the curReadBuf to be returnedif (n > 0 && n <= this.readableLength) {
            curReadBuf = Buffer.alloc(n);
            letbuf; // Used to store the data fetched from the internal cache for each loopletindex = 0; // curReadBuf is the current largest index in curReadBuflet flag = true; // For internal useforLoop implementation outwhile[buffer<5,6,7,8>,buffer<9,10,11,12>]while (flag && (buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    curReadBuf[index++] = buf[i];
                    if(index === n) {// If there are already n entries, the index will be jumpedwhileCycle and this timeforAt the same time, update the length of the internal cache, and take out the excess data into the internal cache flag =false; this.readableLength -= n; // Put the remaining unconsumed items back into the internal cachelet r = buf.slice(i + 1);
                        if (r.length) {
                            this.buffers.unshift(r);
                        }
                        break; }}}} // If there is no content in the cache, you will need to read the content and then emit a readable event. The readable event will be emitted, but not readif (this.readableLength === 0) {
            this.emittedReadable = true; } // If the cache size is smaller than the highWaterMark and the data is not being read, the data needs to be readif((this.readableLength < this.highWaterMark) && ! this.reading) { this.reading =true; this._read(); } // Readableis allowed to be emitted if the result is already at the bottom of the fileif(this.fileSize === this.pos){
            this.emittedReadable = true
        }
        return curReadBuf || null;
    }
    _read() {// If the file is not open, wait until the file is open before readingif(typeof this.fd ! = ='number') {
            return this.once('open', () => this._read()); } // Read highWaterMark so many timesletbuffer = Buffer.alloc(this.highWaterMark); Fs. read(this.fd, buffer, 0, buffer.length, this.pos, (err, byteRead) =>false; // If there is a readout of the content, the identity is reset and the content is put into the built-in buffersif(byteRead > 0) {this.readableLength += byteRead; This. pos += byteRead; // return buffers this.buffers. Push (buffer.slice(0, byteRead));if(this.emittedreadable) {// By default, the readable event this.emittedreadable = will not be emitted next timefalse; // This. Emit ('readable'); }}else{// Readable this.islast = will be emitted when the bottom of the stream is reached for the last timetrue;
                if (this.emittedReadable) {
                this.emittedReadable = false; 
                this.emit('readable'); } // Emit the end event this.emit('end'); }}); }destroy() {
        fs.close(this.fd, () => {
            this.emit('close'); })} // Open the file to readopen() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return;
            }
            this.fd = fd;
            this.emit('open', this.fd);
        })
    }
}
module.exports = ReadStream;
Copy the code

How do YOU use your own ReadStream? Just create a new instance! Let’s have a chestnut

let path = require('path');
let ReadStream = require('./chat');
let rs = new ReadStream(path.join(__dirname, '1.txt'),{
  highWaterMark: 7
});
rs.on('readable'.function(data){// The number of bytes left in the internal cache after each read console.log(rs.readablelength) console.log(rs.read(8)); Console. log(rs.readablelength)})Copy the code