A stream is an abstract interface for processing streaming data in Node.js. The Stream module provides some basic apis for building objects that implement the stream interface.

Node.js provides a variety of stream objects. For example, a request sent to an HTTP server and process.stdout are both instances of streams.

Streams can be readable, writable, or read-write. All streams are instances of EventEmitter.

The type of flow

There are four basic stream types in Node.js (we’ll focus on the first two) :

  • Writable – streams that can be written to (for example, fs.createWritestream ())
  • Readable – a stream that can read data (for example, fs.createreadStream ())
  • Duplex – Streams that are both readable and writable (e.g. Net.socket)
  • Transform – Duplex stream that can modify or Transform data during read/write (for example, zlib.createDeflate())

The buffer

A fairly important concept in streams is that both read and write streams are implemented through caching. Both writable and readable streams store data in an internal buffer and can be fetched using writable. WritableBuffer or readable.readableBuffer, respectively. The amount of data that can be buffered depends on the highWaterMark option passed to the stream constructor. By default, highWaterMark 64*1024 bytes reads and writes data to the buffer, and then reads or writes data to a file.

Several important underlying methods

  1. Writable.write (chunk[, encoding][, callback]) The writable.write() method writes data to the stream and calls callback when the data is processed. If an error occurs, the callback is not necessarily called with the error as the first argument. To ensure that write errors are reliably detected, listen for the ‘error’ event. After confirming the chunk, the function returns true if the size of the internal buffer is less than the highWaterMark threshold set when the stream was created. If the return value is false, writing data to the stream should be stopped until the ‘drain’ event is emitted. When a stream is not in the drain state, calls to write() cache blocks and return false. Once all currently cached blocks have been emptied (accepted by the operating system for output), the ‘drain’ event is emitted.
  2. readable.read([size])

Let me give you a little example to help you understand

// pipe
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:1
})
let ws = fs.createWriteStream('./5.txt',{
    highWaterMark:2
})
let index = 1;
rs.on('data', (data) => {
    console.log(index++)
    letflag = ws.write(data); // The call to writable.write() returns when the total size of the internal writable buffer is less than the threshold set by highWaterMarktrue. Returns once the size of the internal buffer reaches or exceeds highWaterMarkfalse.if(! Flag) {// The internal buffer exceeds highWaterMark rs.pause()}})let wsIndex = 1;
ws.on('drain', () => {
    console.log('ws'+wsIndex++)
    rs.resume()
})
// 1 2 ws1 3 4 ws2 5 6 ws3
Copy the code

Several important event listeners

As mentioned above, all streams are instances of EventEmitter, so they can be on, emit, and so on

  1. Rs.on (‘data’,()) // Read buffer
  2. Ws.on (‘drain’,()) // The write buffer is cleared. In the example above, when the write buffer is larger than highWaterMark, we pause the read, wait for the drain event to be heard, and then restart rs.resume()

Pipe Rs.pipe (ws) can be used to write a readable stream to a writable stream

Self – implemented readable stream

let EventEmitter = require('events'); // All streams are instances of EventEmitter, and streams inherit EventEmitterlet fs = require('fs');
class ReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.autoClose = options.autoClose || true;
    this.flags = options.flags || 'r'; this.encoding = options.encoding || null; this.start = options.start || 0; this.end = options.end || null; this.highWaterMark = options.highWaterMark || 64 * 1024; // There should be a variable reading position (variable position) this.pos = this.start; // Controls whether the flowing mode is current this.flowing = null; This.buffer = buffer.alloc (this.highwatermark); // To open the file when creating a readable stream this.open(); // Execute this.on(asynchronously)'newListener', (type) = > {if(type= = ='data'){// The user listens to the data event and starts reading this.flowing =true; this.read(); // Start reading the file}}); }read(){// Wait for the file to open before reading itif(typeof this.fd ! = ='number'){// Wait for the file to open and call againreadmethodsreturn this.once('open',()=>this.read()); } // Start to read // file may have 10 strings // start 0 end 4 // read three at a time 3 // 0-2 // 34lethowMuchToRead = this.end ? Math. Min (enclosing highWaterMark, enclosing the end - this. Pos + 1) : this. HighWaterMark / / file descriptors which buffer read read into the buffer in which position / / Several read into the buffer, read the location of the fs. Read (this fd, enclosing buffer, 0, howMuchToRead, enclosing pos, (err, bytesRead) = > {if(bytesRead>0){this.pos += bytesRead; // Keep what is usefulletr = this.buffer.slice(0, bytesRead); r = this.encoding ? r.toString(this.encoding) : r; // First time to read this.emit('data', r);
        if(this.flowing) { this.read(); }}else{
        this.end = true;
        this.emit('end'); this.destroy(); }}); }destroy() {// check whether the file is open (close the file)if (typeof this.fd === 'number') {
      fs.close(this.fd, () => {
        this.emit('close');
      });
      return;
    }
    this.emit('close');
  }
  openFs.open (this.path, this.flags, (err, fd) => {if (err) {
        this.emit('error', err);
        if(this.autoClose) { this.destroy(); // Destroy the closed file (triggers the close event)}return;
      }
      this.fd = fd;
      this.emit('open'); // Trigger the file open event}); }pause(){
    this.flowing = false;
  }
  resume(){
    this.flowing = true; this.read(); }} module. Exports = ReadStream;Copy the code

Writable stream implemented by yourself

let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter{
  constructor(path,options ={}){
    super();
    this.path = path;
    this.flags = options.flags || 'w';
    this.mode = options.mode || 0o666;
    this.highWaterMark = options.highWaterMark || 16*1024;
    this.start = options.start || 0;
    this.autoClose = options.autoClose|| true;
    this.encoding = options.encoding || 'utf8'; NeedDrain = this. NeedDrain =false; // Is writing this. Writing =false; This.buffer = []; this.buffer = []; This.len = 0; this.len = 0; This.pos = this.start; this.open(); } // 0 [1 2] write(chunk, encoding = this.encoding,callback){ chunk = Buffer.isBuffer(chunk)? chunk:Buffer.from(chunk); this.len += chunk.length; NeedDrain = this.highwatermark <= this.len; // this.fdif(this.writing){
      this.buffer.push({chunk,encoding,callback});
    }else{// Clear the cache when the file is written this.writing =true; This._write (chunk,encoding,()=>this.clearBuffer()); }return! this.needDrain; // The return value of write must betrue / falseWhen this.len >= this.higwatermark returns a fasle, the example suspends reading. Wait for the write to complete} _write(chunk,encoding,callback){if(typeof this.fd ! = ='number') {
      return this.once('open', () => this._write(chunk, encoding, callback)); } // chunk is the location and length of the data 0 to be written, this.pos offset fs.write(this.fd, chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{ this.pos += bytesWritten; this.len -= bytesWritten; // The length of the write will reduce callback(); }); }clearBuffer() {let buf = this.buffer.shift();
    if(buf){
      this._write(buf.chunk, buf.encoding, () => this.clearBuffer());
    }else{
      this.writing = false;
      this.needDrain = false; // Trigger once drain and set backfalseNext time to judge this. Emit ('drain'); }}destroy() {if(typeof this.fd === 'number'){
      fs.close(this.fd,()=>{
        this.emit('close');
      });
      return 
    }
    this.emit('close');
  }
  open(){
    fs.open(this.path,this.flags,this.mode,(err,fd)=>{
      if(err){
        this.emit('error');
        this.destroy();
        return 
      }
      this.fd = fd;
      this.emit('open');
    });
  }
}
module.exports = WriteStream;
Copy the code

Above is some basic knowledge of stream, stream simple application and their own implementation of readable stream writable stream. Of course, there are a lot of shortcomings, I hope friends to point out. I also hope to learn and share with you friends!