Introduction of the Stream

A stream is an abstract interface that handles stream data in Node.js. The stream module provides basic apis that make it easy to build objects that implement the stream interface. In Node.js, there are request flows, response flows, file flows, etc. The bottom layer of these flows is encapsulated by the Stream module. Flows are readable and writable, and all flows are instances of EventEmitter.

Streams provide four types of streams:

  • Stream.Readable (Creating a Readable Stream)
  • Stream.Writable (creates Writable streams)
  • Stream.Duplex (Creating readable and writable streams, commonly known as Duplex streams, TCP Sockets are examples of Duplex streams)
  • Stream.transform (also a Duplex Stream in which data written to the writable end is transformed and automatically added to the readable end)

Readable Stream

A Readable Stream is an abstraction of the source that provides the data. All Readable streams implement the interface defined by the Stream.Readable class. Such as:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP sockets
  • process.stdin

Readable streams fall into two modes, flowing and paused. By default, after we created the read stream object, it would be in paused mode, where we had to explicitly call the stream.read() method to read a piece of data from the stream. And data will be read continuously flow mode, here, of course, is to pay attention to data flow mode is not flow directly to the application, there is actually a buffer pool, behind the pool size is defined when we create read stream objects, each time you read most of the number of bytes read can’t more than the size of the pool, for example, If there are nine bytes to read and the pool size is three bytes, it is read in three batches. In flow mode, pause can be called to return to pause mode and resume to flow mode again.

To use read streams in flow mode, look at this code:

let fs = require('fs');
let path = require('path'); // Returns a readable stream objectlet rs = fs.createReadStream(path.join(__dirname, '1.txt'), {
    flags: 'r', // The file is read.'utf8'// Default is null. Null represents buffer autoClose:trueStart: 0, end: 3}); rs.setEncoding('utf8');

rs.on('open'.function() {
    console.log('File open');
});

rs.on('close'.function() {
    console.log('off');
});

rs.on('error'.function (err) {
    console.log(err);
});

rs.on('data'.function(data) {// Pause mode -> Flow mode console.log(data); rs.pause(); // pause the data event});setInterval(function() { rs.resume(); // Restore data time trigger}, 3000); rs.on('end'.function() {
    console.log('end')});Copy the code

Let’s take a look at read flow to suspend mode, in pause mode, we can listen to a readable event, it will trigger when flow are available in data read, but this is not an automatic reading process, it requires us to call the read method to read data, in the event monitoring, the default cache area will be filled with a first, The readable event will be emitted again each time the cache has read, and each time the read method is called, it will check if the cache is smaller than the HighWaterMark size, and if it is smaller, it will reada piece of data the size of the HighWaterMark and put it into the cache. Another point to note is that sometimes when we call read we may read more than the size of the cache. By default, we will change the size of the cache to a size that is appropriate for the current read, and then re-emit the readable event.

Read streams using pause mode can be referenced in this code:

let fs = require('fs');
let path = require('path');

let rs = fs.createReadStream(path.join(__dirname,'./1.txt'), {
    flags: 'r',
    autoClose: true,
    encoding: 'utf8', start: 0, highWaterMark: 3 }); Rs.on (); rs.on(); rS.on (); rS.on ('readable'.function() {// I want to read only three of the five caches and it will change the size of the cache to read againlet result =  rs.read(5);
    console.log(result);
});
Copy the code

Readable stream implementation

Now let’s implement a simple readable stream. From the on method we can guess that streams inherit from the EventEmitter module, so let’s create a read stream class that inherits from the EventEmitter module and define a constructor and some property parameters

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; This. The encoding = options. The encoding | | null / / to create a buffer pool this buffer is a read this. How much buffer = Buffer.alloc(this.highWaterMark); this.pos = this.start; }} module.exports = ReadStream;Copy the code

Then we need to define an open method to open the file to get the file descriptor (fd) and call the open method in the constructor. At the same time, we need to add an event listener to check whether the data event is being listened on, if the listener is going to flow mode. Then we also need to define a Destroy event to be called when the file operation fails or after reading.

constructor(path,options) { ... this.open(); // Open the file fd this.flowing = null; // null is the pause mode this.on('newListener', (eventName, callback) => {
        if (eventName === 'data') {// The user listens to the data event this.flowing =true; // Read this.read(); }}}destroy() {// Check if fd has closed file to trigger the close eventif (typeof this.fd === 'number') {
        fs.close(this.fd, () => {
            this.emit('close');
        });
        return;
    }
    this.emit('close'); / / destroy};openFs.open (this.path, this.flags, (err, fd) => {fs.open(this.path, this.flags, (err, fd) => {if (err) {
            this.emit('error', err);
            if(this.autoclose) {// Whether this.destroy() is automatically disabled; }return; } this.fd = fd; // Save the file descriptor this.emit('open'); // File open}); }Copy the code

Open is an asynchronous operation, so we can only read the maximum number of bytes in the buffer pool after the open callback is triggered and the fd is acquired. Then, when the read position is greater than the end or the number of bytes read is zero, the file is finished reading. Otherwise, the file is determined to be in flow mode, and if so, the read method is recursively called.

read() {// The file is not open yetif(typeof this.fd ! = ='number') {// When the file is actually opened, the open event will be triggered, and then the execution will be executedreadAt this point fd must be availablereturn this.once('open', () => this.read())
    }
    lethowMuchToRead = this.end ? Math.min(this.highWaterMark,this.end-this.pos+1) : this.highWaterMark; Fs.read (this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {// Read the number of cumulativeif (bytesRead > 0) {
            this.pos += bytesRead;
            let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);
            this.emit('data', data); // When the read position is greater than the end, the read is completeif (this.pos > this.end) {
                this.emit('end');
                this.destroy();
            }
            if(this.flowing) {// The flowing pattern continues to trigger this.read(); }}else{
            this.emit('end'); this.destroy(); }}); }Copy the code

Next, we implement pause and resume methods

resume() {
    this.flowing = true;
    this.read();
}
pause() {
    this.flowing = false;
}
Copy the code

Finally, the complete code is attached

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.open(); // Open the file fd this.flowing = null; // null is a pause mode to see if a data event is listened on, Buffer = buffer.alloc (this.highwatermark); this.pos = this.start; // pos read position variable start invariant this.on('newListener', (eventName,callback) => {
            if (eventName === 'data') {// The user listens to the data event this.flowing =true; // Read this.read(); }}}read(){// The file has not been opened yetif(typeof this.fd ! = ='number') {// When the file is actually opened, the open event will be triggered, and then the execution will be executedreadAt this point fd must be availablereturn this.once('open', () => this.read())
        }
        lethowMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark; Fs.read (this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {// Read the number of cumulativeif (bytesRead > 0) {
                this.pos += bytesRead;
                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);
                this.emit('data', data); // When the read position is greater than the end, the read is completeif(this.pos > this.end){
                    this.emit('end');
                    this.destroy();
                }
                if(this.flowing) {// The flowing pattern continues to trigger this.read(); }}else{
                this.emit('end'); this.destroy(); }}); }resume() {
        this.flowing = true;
        this.read();
    }
    
    pause() {
        this.flowing = false;
    }
    
    destroy() {// Check if fd has closed file to trigger the close eventif(typeof this.fd === 'number') {
            fs.close(this.fd, () => {
                this.emit('close');
            });
            return;
        }
        this.emit('close'); / / destroy};openFs.open (this.path, this.flags, (err,fd) => {fs.open(this.path, this.flags, (err,fd) => {if (err) {
                this.emit('error', err);
                if(this.autoclose) {// Whether this.destroy() is automatically disabled; }return; } this.fd = fd; // Save the file descriptor this.emit('open'); // File open}); } } module.exports = ReadStream;Copy the code

Here, we’ll is roughly finished the realization of the flow pattern, now, let’s achieve a pause mode, with flow pattern we also need to first create a constructor, the initialization define some basic options properties, then calls a method to open the file open, and there’s a spotted destroy methods to the close logic.

let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || 'r'; this.buffers = []; // cache this.pos = this.start; this.length = 0; // Cache size this.emittedreadable =false;
        this.reading = false; // Not reading this.open(); this.on('newListener', (eventName) => {
            if (eventName === 'readable') { this.read(); }})}destroy() {
        if(typeof this.fd ! = ='number') {
            return this.emit('close')
        }
        fs.close(this.fd, () => {
            this.emit('close')})}open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open'); }); }}Copy the code

Then we have to implement the read method, first, such as what is said above if the cache size is smaller than the nozzle size that we are going to read a nozzle size of the data in buffer area, and we need to trigger when the buffer is empty readable events, the loop until all data in a file is read did not end after the triggering end event

read(n) {// The current cache is smaller than highWaterMarkif (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(! this.reading) { this.reading =true; this._read(); // asynchronous}}}_read() {// Read the file when it is openif(typeof this.fd ! = ='number') {
        return this.once('open', () => this._read()); } // I want to drink three litres of water first []let buffer = Buffer.alloc(this.highWaterMark);
    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
        if(bytesRead > 0) {this.buffers. Push (buffer.slice(0, bytesRead)); this.pos += bytesRead; // Maintain the read index this.length += bytesRead; // Maintain the cache size this.reading =false; // Whether a readable event needs to be emittedif (this.emittedReadable) {
                this.emittedReadable = false; // This. Emit ('readable'); }}else {
            this.emit('end'); this.destroy(); }})}Copy the code

The next step is to implement the logic of reading the buffer and returning it in the read method. This is complicated because each element of our buffers array holds a buffer string of the size of a port at a time. Therefore, we need to fetch the corresponding number of buffers according to the read length passed in by the caller, and then put any remaining buffer back into the original array for the next read

read(n) {// If n>0, go to the cachelet buffer=null;
    letindex = 0; // Maintain the index of bufferlet flag = true;
    if[[2,3],[4,5,6]] buffer = buffer.alloc (n); // This is the buffer to returnlet buf;
        while (flag && (buf = this.buffers.shift())) {
            for (let i = 0; i < buf.length; i++) {
                buffer[index++] = buf[i];
                if(index === n) {// There is no need to copy flag =false;
                    this.length -= n;
                    letbufferArr = buf.slice(i + 1); // If there is any content left in the cacheif (bufferArr.length > 0) {
                        this.buffers.unshift(bufferArr);
                    }
                    break; }}}} // Read if the current cache is smaller than highWaterMarkif (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(! this.reading) { this.reading =true; this._read(); // asynchronous}}return buffer
}
Copy the code

Finally, in read we need to deal with the fact that when we call read, the incoming read length may be larger than the current size of the cache. In this case, we will first change the size of the cache to a size suitable for the current read, then reada buffer that matches this size into the cache, and then re-fire the readable event

function computeNewHighWaterMark(n) {
  n--;
  n |= n >>> 1;
  n |= n >>> 2;
  n |= n >>> 4;
  n |= n >>> 8;
  n |= n >>> 16;
  n++;
 return n;
}

read(n) {// Want to take 1ifHighWaterMark = computeNewHighWaterMark(n) this.emittedreadable = computeNewHighWaterMark(n) this.emittedreadable =true; this._read(); } // If n>0, go to the cachelet buffer = null;
    letindex = 0; // Maintain the index of bufferlet flag = true;
    if[[2,3],[4,5,6]] buffer = buffer.alloc (n); // This is the buffer to returnlet buf;
        while (flag && (buf = this.buffers.shift())) {
            for (let i = 0; i < buf.length; i++) {
                buffer[index++] = buf[i];
                if(index === n) {// There is no need to copy flag =false;
                    this.length -= n;
                    letbufferArr = buf.slice(i+1); // If there is any content left in the cacheif (bufferArr.length > 0) {
                        this.buffers.unshift(bufferArr);
                    }
                    break; }}}} // Read if the current cache is smaller than highWaterMarkif (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(! this.reading) { this.reading =true; this._read(); // asynchronous}}returnBuffer; }Copy the code

Finally, the complete code is attached

let fs = require('fs');
let EventEmitter = require('events');

function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
}
  
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || 'r'; this.buffers = []; // cache this.pos = this.start; this.length = 0; // Cache size this.emittedreadable =false;
        this.reading = false; // Not reading this.open(); this.on('newListener', (eventName) => {
            if (eventName === 'readable') { this.read(); }})}read(n) { 
        ifHighWaterMark = computeNewHighWaterMark(n) this.emittedreadable = computeNewHighWaterMark(n) this.emittedreadable =true; this._read(); } // If n>0, go to the cachelet buffer = null;
        letindex = 0; // Maintain the index of bufferlet flag = true;
        if[[2,3],[4,5,6]] buffer = buffer.alloc (n); // This is the buffer to returnlet buf;
            while (flag && (buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){// There is no need to copy flag =false;
                        this.length -= n;
                        letbufferArr = buf.slice(i+1); // If there is any content left in the cacheif(bufferArr.length > 0) {
                            this.buffers.unshift(bufferArr);
                        }
                        break; }}}} // Read if the current cache is smaller than highWaterMarkif (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(! this.reading){ this.reading =true; this._read(); // asynchronous}}returnbuffer; } // Encapsulate the read method_read() {// Read the file when it is openif(typeof this.fd ! = ='number') {
            return this.once('open', () => this._read()); } // I want to drink three litres of water first []let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if(bytesRead > 0) {this.buffers. Push (buffer.slice(0, bytesRead)); this.pos += bytesRead; // Maintain the read index this.length += bytesRead; // Maintain the cache size this.reading =false; // Whether a readable event needs to be emittedif (this.emittedReadable) {
                    this.emittedReadable = false; // This. Emit ('readable'); }}else {
                this.emit('end'); this.destroy(); }})}destroy() {
        if(typeof this.fd ! = ='number') {
            return this.emit('close')
        }
        fs.close(this.fd, () => {
            this.emit('close')})}open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        });
    }
}

module.exports = ReadStream;
Copy the code

LineReader

Finally, we will implement an example of a line reader in conjunction with the pause mode readable described above. We will first define a line reader class and its test code, which will do this by creating a LineReader object and passing in the file to be read, then listening for the line event. The line callback is triggered each time a row of data is read.

// LineReader LineReaderlet fs = require('fs');
let EventEmitter = require('events');
let path = require('path');

class LineReader extends EventEmitter {

}

let lineReader = new LineReader(path.join(__dirname, './2.txt'));
lineReader.on('line'.function (data) {
    console.log(data); // abc , 123 , 456 ,678
})
Copy the code

Now we can implement the constructor of LineReader. We first need to create a readable stream of the file and define that we will listen for readable events for the stream when the object starts listening for line events. We also need to define a temporary array called buffer. The buffer is used to record the current row each time a row is read, which is then passed in when the line callback is called. The readablecallback will then read the contents character by character. A LINE event will be emitted if it is determined that the end of a row has been reached. The logic for determining whether a line is new is different on Windows and MAC. In Windows, the newline return is \r\n, but in MAC, it is \n. In addition, after determining \r, we need to read one more byte to see if \n is new. We need to put it in the buffer array and wait for the next output, and finally we need to listen for the end event which is when the read stream has read all the data and pass the last buffer, the last line, into the line callback

constructor(path) { super(); this.RETURN = 0x0d; this.LINE = 10; this.buffer = []; this._rs = fs.createReadStream(path); // By default, highWaterMark this.on('newListener', (eventName) => {
        if (eventName === 'line') {
            this._rs.on('readable', () = > {letchar; // The output is of type bufferwhile (char = this._rs.read(1)) {
                    letcurrent = char[0]; Switch (current) {// When hit \r, this line is okcase this.RETURN:
                            this.emit('line', Buffer.from(this.buffer).toString());
                            this.buffer.length = 0;
                            letc = this._rs.read(1); // read \r to see if the next item is \n if it is not a normal itemif(c[0] ! == this.LINE) { this.buffer.push(c[0]); }break;
                        caseThis. LINE: // MAC only \r not \n this.emit('line', Buffer.from(this.buffer).toString()); this.buffer.length = 0; default: this.buffer.push(current); }}}); this._rs.on('end', () => {
                this.emit('line', Buffer.from(this.buffer).toString()); this.buffer.length = 0 }); }})}Copy the code