concept

Stream is one of the most important components and patterns in Node.js, and I’ve seen the adage “Stream all the things” in the community before.

Specifically, a stream is an ordered set of byte data transfers with a starting and ending point. It is an abstract interface.

A stream is a collection of data — just like an array or string. The difference is that the data in the stream may not all be available at once, and you don’t have to put it all into memory at once. This makes streams useful when manipulating large amounts of data or when data is sent segment by segment from an external source.

Each stream object is an instance of the EventEmitter class, with corresponding ON and EMIT methods, as we’ll see in the code examples shown below.

Stream is the core module of Node. It is introduced as follows:

let Stream = require('stream')
Copy the code

The realization form

Streams can be implemented in various forms in Node, such as:

Request REq and response RES in HTTP 2. Socket object Sockets in TCP 3. Readable and writable streams in FS 4Copy the code

type

The following four types are provided:

Readable
let Readable = stream.Readable
Copy the code
Writeable stream
let Readable = stream.Writeable
Copy the code
Read-write Flow (Duplex)
let Duplex = stream.Duplex
Copy the code
Transform flow
let Transform = stream.Transform
Copy the code

The principle of

Readable

An object that implements the Stream. Readable interface that reads object data as stream data and starts emitting data when listening for data events.

1. Create
let rs = fs.createReadStream(path,{
    flags: 'r'// What to do to open a file. The default is'r'Encoding: null, // Default is null start:'0'// Start reading index end:' ', // End index read (including end) highWaterMark:' ', // read buffer default size threshold 64KB})Copy the code
2. Methods and functions
// 1. Listen to the data event stream automatically switch to flow mode // 2. The data will be read rS.on as quickly as possible'data'.function(data) { console.log(data); }); Rs.on ()'end'.function () {
    console.log('Read complete'); }); // Open event rs.on('open'.function() { console.log(err); }); // Readable stream close event rs.on'close'.function() { console.log(err); }); Rs.setencoding (encoding) rs.setencoding ('utf8');


rs.on('data'.function(data) {// The readable stream pauses reading rs.pause(); console.log(data); });setTimeout(function() {// Resume rs.resume(); }, 2000);Copy the code
Classification of 3.

Readable flows are divided into flow mode and pause mode

The readable stream object readable has an object that maintains state, readable._readableState, which is referred to here simply as state. One tag, state.flowing, was used to identify the flow’s pattern. It has three possible values:

True Flow mode. False Pause mode. Null Indicates the initial state.

1) Flowing Mode (Flowing Mode)

In flow mode, data will be continuously produced, forming a “flow” phenomenon. The mode can be entered by listening for data events on the stream.

2) Paused Mode

In pause mode, read() needs to be explicitly called to trigger the data event.

In its initial state, listening for the data event causes the stream to go into flow mode. However, if you are in pause mode, listening for the data event does not put it into flow mode. To consume the stream, the read() method needs to be explicitly called.

3) Mutual transformation

Calling readable.resume() causes the stream to go into flowing mode, and state.flowing is set to true. A call to readable.pause() causes the stream to go into pause mode, and state.flowing is set to false.

Principle 4.

When creating a Readable stream, you need to inherit Readable and implement the _read method.

  1. The _read method is the logic that reads concrete data from the underlying system, that is, the logic that produces the data.

    In the _read method, data is put into a readable stream for downstream consumption by calling push(data). In the _read method, push(data) can be called synchronously or asynchronously. When all data has been produced, push(NULL) must be called to terminate the readable stream. Once the flow ends, you can no longer call push(data) to add data. Readable streams can be consumed by listening for data events.

    After listening for its data event for the first time, readable will continuously call _read() to output the data by firing the data event. The first data event is emitted in the next tick, so it is safe to put the logic before the data output after the event listener. The end event is emitted when all data is consumed.

  2. Break down

doRead

A cache is maintained in the stream, and when there is enough data in the cache, a call to read() does not cause a _read() call, that is, there is no need to request data from the underlying layer. DoRead is used to indicate whether read(n) needs to fetch data from the underlying layer.

var doRead = state.needReadable

if (state.length === 0 || state.length - n < state.highWaterMark) {
  doRead = true
}

if (state.ended || state.reading) {
  doRead = false
}

if (doRead) {
  state.reading = true
  state.sync = true
  if (state.length === 0) {
    state.needReadable = true
  }
  this._read(state.highWaterMark)
  state.sync = false
}

Copy the code

When the cache length is 0 or the number of cache areas is less than the state.highwatermark threshold, _read() is called to read the data underneath. State.reading indicates whether the last fetch from the bottom is complete, and when push is called, it is set to false, indicating that _read() is finished.

push

The consumer calls read(n) to force the stream to output data, and the flow through _read() causes the underlying push method to pass data to the stream. If the cache is empty when the push method is called, the current data is the next data needed. This data may be added to the cache first, or it may be printed directly. When the read method is executed, if data is retrieved from the cache after _read is called, it is printed as a data event.

So, if the cache is empty when _read calls push asynchronously, that means that the current data is the next data needed and will not be output by the read method, it should be output immediately in the push method as a data event.

The figure above immediately outputs the condition

state.flowing && state.length === 0 && ! state.syncCopy the code
end

Because the flow requests data from the bottom layer in batches, the bottom layer needs to explicitly tell the flow whether the data has been fetched. So when push(null) is called on a fetch (_read()), it means that the underlying fetch is complete. At this point, the stream will set state.ended.

State. length indicates the current amount of data in the cache. When state.length is 0 and state.ended is true, it means that all data is consumed. Once this condition is detected during read(n) execution, the End event is emitted. Of course, this event only fires once.

readable

After _read() is called, read(n) tries to fetch data from the cache. If _read() calls push asynchronously, the amount of data in the cache does not increase, and it is easy to run out of data.

If read(n) returns null, the required amount of data was not fetched from the cache this time. At this point, the consumer needs to wait for new data to arrive before attempting to call the read method again.

After the data arrives, the stream is notified to the consuming party through readable events. In this case, if the push method outputs the data immediately, the recipient will simply listen for the data event; otherwise, the data will be added to the cache and a readable event will need to be emitted. The consumer must listen for the event and then call the read method to get the data.

5. Write a

Flow pattern

let EventEmitter = require('events');
let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path, options) { super(path, options); // Initialize the parameter this.path = path; / / this path. The flags = options. Flags | |'r'; // What to do to open a file. The default is'r'this.mode = options.mode || 0o666; this.pos = this.start = options.start || 0; // The offset defaults to the starting position this.end = options.end; This. encoding = options.encoding; / / encoding this. HighWaterMark = options. HighWaterMark | | 64 * 1024; // The readable stream cache threshold defaults to 64K this.flowing = null; // Indicates the null initial statefalseSuspend modetrueBuffer = buffer.alloc (this.highwatermark); // cache this.open(); // // ready to open file to read // newListener this.on(newListener this.on) is triggered when arbitrary listener functions are added to the instance.'newListener', (type, listener) => {
            if (type= ='data') {// The initial state becomes flowing mode this.flowing = when the readable stream listens for data eventstrue; this.read(); }}); }read() {// If the file identifier is not a number, the file has not been opened yetif(typeof this.fd ! = ='number') {
            return this.once('open', () => this.read()); } // How many are read at a timelet howMuchToRead = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
        
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err, bytes) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            if (bytes) {
                let data = this.buffer.slice(0, bytes);
                data = this.encoding ? data.toString(this.encoding) : data;
                this.emit('data', data);
                
                this.pos += bytes;
                
                if (this.end && this.pos > this.end) {
                    return this.endFn();
                } else {
                    if(this.flowing) { this.read(); }}}else {
                returnthis.endFn(); }})}endFn() {
        this.emit('end');
        this.destroy();
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open'); })}destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        });
    }
    
    pipe(dest) {
        this.on('data', data => {
            let flag = dest.write(data);
            if (!flag) {
                this.pause();
            }
        });
        dest.on('drain', () => {
            this.resume();
        });
    }
    
    pause() {
        this.flowing = false;
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
}

module.exports = ReadStream;
Copy the code
The test code
let ReadStream = require('./ReadStream');
let rs = new ReadStream('1.txt',{
   highWaterMark:3,
    encoding:'utf8'}); // In the real world, the pause mode will occur immediately after the readable stream is created. // The size of the cache can be seen as rs.on('readable'.function() { console.log(rs.length); //3 // After you consume a byte, the cache becomes 2 bytesletchar = rs.read(1); console.log(char); console.log(rs.length); // If it is found that the number of bytes in the buffer is below the maximum watermark, it will now read the maximum watermark to fill the buffersetTimeout(()=>{ console.log(rs.length); / / 5}, 500)});Copy the code

Suspend mode

let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
     
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end'.function () {
            if(this.autoClose) { this.destroy(); }}); this.on('newListener', (type) = > {if (type= ='data') {
                this.flowing = true;
                this.read();
            }
            if (type= ='readable') { this.read(0); }}); this.open(); }open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }
    
    read(n) {
        if(typeof this.fd ! ='number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if(n ! = n) { n = this.length; }if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while(null ! = (b = this.buffers.shift())) {for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            ret = ret.toString(this.encoding);
        }
        
        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }
                        
                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false; }}}else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end'); }})}if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read();
        }
        return ret;
    }
    
    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }
    
    pause() {
        this.flowing = false;
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if(! flag) this.pause(); }); dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}

module.exports = ReadStream;
Copy the code
Writable
  1. create
let rs = fs.createWriteStream(path,{
    flags: 'w'// What to do to open a file. The default is'w'Encoding: null, // Default is null highWaterMark:' ', // read the default buffer size threshold 16KB})Copy the code
  1. Methods and Functions
let ws = fs.createWriteStream(path,{
    chunk: ' ',// The data to be written.' 'Callback: ()=>{} // Callback after successful writing}); // The return value is a Boolean value when the system cache is fullfalse, when not fulltruews.end(chunk,[encoding],[callback]); By passing the optional chunk and encoding arguments, one more piece of data can be written to Writable before closing the stream. If the optional callback function is passed, it will be used as a callback function'finish'The event callback function // When a stream is not in drain, the call to write() caches the data block and returnsfalse. Once all currently cached data blocks are empty (accepted by the operating system for output), then'drain'The event triggers the suggestion as soon as write() returnsfalseIn the'drain'No data block can be written before the event is triggeredlet fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
  flags:'w',
  encoding:'utf8',
  highWaterMark:3
});
let i = 10;
function write() {let  flag = true;
 while(i&&flag){
      flag = ws.write("1");
      i--;
     console.log(flag);
 }
}
write();
ws.on('drain',()=>{
  console.log("drain"); write(); }); // After the stream.end() method is called and the buffer data has been passed to the underlying system,'finish'The event will be triggered. var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
  writer.write(`hello, ${i}! \n`); } writer.end('end \ n');
writer.on('finish', () => {
  console.error('All writes are complete! '); }); // PIPE usage limits the amount of data retained to an acceptable level so that sources and targets of different speeds do not overwhelm available memory.readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt'); from.pipe(to); Var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data'.function (data) {
    var flag = ws.write(data);
    if(! flag) rs.pause(); }); ws.on('drain'.function () {
    rs.resume();
});
rs.on('end'.function () {
    ws.end();
});
Copy the code

Principle 3.

const Writable = require('stream'Writable const Writable = Writable() // Implement '_write' method // this is to write data to the underlying logic Writable._write =function(data, enc, next) {// Write the data in the stream to the underlying process.stdout.write(data.tostring ().toupperCase ()) NextTick (next)} // All data has been written to the underlying writable.on('finish', () => process.stdout.write('DONE') // Write data to the stream writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n'// Call the 'end' method writable.end() when no data is written to the stream.Copy the code
Upstream writes data to a writable stream by calling writable.write(data). The write() method calls _write() to write data to the underlying layer. In _write, when data is successfully written to the underlying layer, next(err) must be called to tell the stream to start processing the next data. Next calls can be either synchronous or asynchronous. Upstream must call writable.end(data) to terminate the writable stream, and data is optional. After that, you cannot call write to add data. After the end method is called, the Finish event is emitted when all the underlying writes are complete.Copy the code

4. The handwritten

WriteStream

let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.pos = this.start;
        this.encoding = options.encoding || 'utf8'; this.autoClose = options.autoClose; this.highWaterMark = options.highWaterMark || 16 * 1024; this.buffers = []; // Cache this. Writing =false; This. length = 0; This.open (); }open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.fd = fd;
            this.emit('open');
        });
    }
    
    
    write(chunk, encoding, cb) {
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding);
        let len = chunk.length;
        
        this.length += len;
        
        let ret = this.length < this.highWaterMark;
                if (this.writing) {
            this.buffers.push({
                chunk,
                encoding,
                cb
            });
        } else {
            this.writing = true;
            this._write(chunk, encoding, () => this.clearBuffer());
        }
        return ret;
    }
    
    clearBuffer() {
        let data = this.buffers.shift();
        if (data) {
            this._write(data.chunk, data.encoding, () => this.clearBuffer())
        } else {
        
            this.writing = false;
            this.emit('drain');
        }
    }
    
    _write(chunk, encoding, cb) {
        if(typeof this.fd ! = ='number') {
            return this.once('open', () => this._write(chunk, encoding, cb));
        }
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, bytesWritten) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    this.emit('error', err); } } this.pos += bytesWritten; this.length -= bytesWritten; cb && cb(); })}destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }
}

module.exports = WriteStream;
Copy the code
Duplex

Duplex is essentially a stream that inherits Readable and Writable. Thus, a Duplex object can be used either as a readable stream (requiring the _read method) or as a writable stream (requiring the _write method).

var Duplex = require('stream').Duplex var Duplex = Duplex() // Underlying read logic Duplex._read =function () {
  this._readNum = this._readNum || 0
  if (this._readNum > 1) {
    this.push(null)
  } else {
    this.push(' '+ (this._readNum++))}} // The underlying write logic on the writable side duplex._write =function (buf, enc, next) {
  // a, b
  process.stdout.write('_write ' + buf.toString() + '\n')
  next()
}

// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))

duplex.write('a')
duplex.write('b')

duplex.end()
Copy the code

The _read method is implemented in the above code, so you can listen for data events to consume data produced by Duplex. At the same time, the implementation of the _write method, as a downstream to consume data.

Because it is both readable and writable, it is said to have two ends: writable and readable. The Writable interface is the same as Writable and is used as downstream. The Readable interface is the same as Readable and is used upstream.

Transform

In the above example, the data in the readable stream (0, 1) is separated from the data in the writable stream (‘a’, ‘b’), but the data written to the writable side of the Transform is automatically added to the readable side after being transformed. Tranform inherits from Duplex and already implements the _read and _write methods, while requiring the user to implement a _transform method.

'use strict'

const Transform = require('stream'). The Transform class Rotate extends the Transform {constructor (n) {super () / / Rotate the letters ` n ` enclosing a position offset = (n | | 13) % 26} / / _transform(buf, enc, next) {var res = buf.tostring ().split(' ').map(c => {
      var code = c.charCodeAt(0)
      if (c >= 'a' && c <= 'z') {
        code += this.offset
        if (code > 'z'.charCodeAt(0)) {
          code -= 26
        }
      } else if (c >= 'A' && c <= 'Z') {
        code += this.offset
        if (code > 'Z'.charCodeAt(0)) {
          code -= 26
        }
      }
      return String.fromCharCode(code)
    }).join(' 'Rotate = new Rotate(3) Rotate() Rotate() Rotate() Rotate() Rotate() Rotate()'data', data => process.stdout.write(data))
transform.write('hello, ')
transform.write('world! ')
transform.end()

// khoor, zruog!
Copy the code

The resources

The implementation of pipe in Node.js is resolved through source code