Stream is an abstract interface. Node has many objects that implement this interface. For example, a Request object that makes a request to an HTTP server is a Stream, and process.studo(standard output).



Why flow

Take an example of reading a file
If you use fs.readFileSync to read a file synchronously, the program will be blocked and all data will be read into memory. If you use fs.readFile to read a file synchronously, the program will not be blocked, but all data will be read into memory at once


When dealing with large file compression, archiving, media files, and large log files, memory usage becomes an issue, and in this case streaming becomes an advantage
The stream is designed to be asynchronous, so it is worth reading into a buffer rather than the rest of the file’s data into memory at once, the desired operation will be performed, and the result will be written to the output stream

          



There are four types of Stream in Node:
Readable- Readable operation
Writeable- Writeable operation
Duplex- Read/write operation
Transform- The operation is written to the data and then read out the result
All Stream objects are instances of EventEmitter.


Data – Triggered when there is data to read.
End – Triggered when there is no more data to read
Error – Triggered when an error occurs during receive and write
Finish – Triggered when all data has been written to the underlying system




The following describes common flow operations


Read data from the stream
Create an input.txt file with the following contents
my name is happy
Create the main.js file as follows

let fs=require("fs"); // Create a readable streamlet rs=fs.createReadStream('input.txt');
let result=""
rs.on('data',(data)=>{//data event, the stream is switched to flow mode, the data is sent out as fast as possible result+=data})"end"Console. log(result)}) rs.on()'error',(err)=>{    console.log(err)})
Copy the code

The execution result of the above code is as follows:

my name is happy



fs.createReadStream(path,[options])
The optional parameter options is an object that can have the following properties
  • Flags What to do to open a file. Default is’ r ‘.
  • Encoding The default value is NULL
  • Start Indicates the index position to start reading
  • End Indicates the index position to end reading
  • HighWaterMark reads the cache area by default at 64 KB
Note: If utF8 encoding is specified, highWateMark must be larger than 3 bytes






stream
Create the main.js file as follows:

let fs=require("fs");
let data="My name is happy, haha"// Create a writable stream to the file output.txtlet ws=fs.createWriteStream("output.txt"); ws.write(data); // Data must be buffer/string. The returned value is a Boolean value. If the system cache is full, the value isfalse, is not fulltrue
ws.end();
ws.on("finish"Console.log ()=>{// After the stream.end method is called and all cached data has been passed to the underlying system, finish will trigger console.log()."Write complete.");
})
ws.on("error",(err)=>{
  console.log(err)
})Copy the code

View the output. TXT file displayed
My name is happy, haha


The pipe flow
Pipes provide a mechanism for an output stream to an input stream. Typically we use it to get data from one stream and pass it to another.

                                  

As shown in the picture above, we compare the file to a bucket containing water, and the water is the content of the file. We connect two buckets with pipes to make the water flow from one bucket to the other, thus slowly realizing the copying process of large files.
In the following example, we read the contents of one file and write the contents to another file
Write the above input. TXT to output.txt as follows

let fs=require("fs");
let rs=fs.createReadStream("input.txt");
let ws=fs.createWriteStream("output,txt");
rs.pipe(ws);Copy the code

The contents of input. TXT were successfully copied to output.txt


Implementation principle of PIPE

let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');
let rs = new ReadStream(path.join(__dirname,'./1.txt'),{
    highWaterMark:4
});

let ws = new WriteStream(path.join(__dirname,'./2.txt'),{
     highWaterMark:1
});
 rs.on('data'.function(chunk){// What chunk readlet flag = ws.write(chunk);
     if(! flag){ rs.pause(); // pause reading}}); ws.on('drain'.function(){drain event rs.resume() is emitted when the cache is full and drained; // resume reading});Copy the code









Chain flow
Chaining is the mechanism by which multiple streams operate chains by connecting output streams to another stream. Chaining is typically used for pipeline operations,
The next step is to use chain flow to compress and decompress files.


Create the compresse.js file as follows:

let fs=require("fs");let zlib=require("zlib"); / / compression input. TXT file as input. TXT. GZFS. CreateReadStream ('input.txt').pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz')); // equivalent to fs.createreadstream ('input.txt').pipe(zlib.createGzip())zlib.createGzip().pipe(fs.createWriteStream('input.txt.gz'));Copy the code

After running the code, we can see that input.txt is generated in the current directory as an input.txt compressed file.
Next, let’s decompress the file and create decompress.js as follows: decompress.js

let fs=require("fs");
let zlib=require("zlib");
fs.createReadStream('input.txt.gz').pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('input.txt'))Copy the code





Let’s talk about a simple readable stream implementation

let EventEmitter = require('events');let fs = require('fs'); class ReadStream extends EventEmitter { constructor(path,options){ super(); this.path = path; this.flags = options.flags ||'r';        this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark|| 64*1024; this.start = options.start||0; this.end = options.end; this.encoding = options.encoding || null this.open(); // Open the file and get fd this.flowing = null; This.buffer = buffer.alloc (this.highwatermark); this.buffer = buffer.alloc (this.highwatermark); this.pos = this.start; // pos read position variable start invariant this.on('newListener',(eventName,callback)=>{            if(eventName === 'data'){// The user listens to the data event this.flowing =true; // Read this.read(); }}}read(){// The file has not been opened yetif(typeof this.fd ! = ='number'){// When the file is actually opened, the open event will be triggered, and then the execution will be executedreadAt this point fd must be availablereturn this.once('open',()=>this.read())        }                lethowMuchToRead = this.end? Math.min(this.highWaterMark,this.end-this.pos+1) :this.highWaterMark; Fs. Read (this fd, enclosing buffer, 0, howMuchToRead, enclosing pos, (err, bytesRead) = > {/ / read many accumulationif(bytesRead>0){                this.pos+= bytesRead;     letdata = this.encoding? this.buffer.slice(0,bytesRead).toString(this.encoding) :this.buffer.slice(0,bytesRead); // Capture bytesRead from a buffer this.emit('data',data);             if(this.pos > this.end){emit(this.pos > this.end){emit(this.pos > this.end);'end');                    this.destroy();                }                if(this.flowing) {// The flowing pattern continues to trigger this.read(); }}else{                this.emit('end'); this.destroy(); }}); } pipe(ws){ this.on('data',(chunk)=>{            let flag = ws.write(chunk);            if(!flag){                this.pause();            }        });        ws.on('drain',()=>{ this.resume(); })}resume(){        this.flowing = true;        this.read();    }    pause(){        this.flowing = false;    }    destroy(){// Check if fd has closed file to trigger the close eventif(typeof this.fd ==='number'){            fs.close(this.fd,()=>{                this.emit('close');            });            return;        }        this.emit('close'); / / destroy};open() {/ / open the first file fs. Open (enclosing path, enclosing flags (err, fd) = > {if(err){                this.emit('error',err);                if(this.autoclose){// Whether this.destroy() is automatically disabled; }return; } this.fd = fd; // Save the file descriptor this.emit('open'); // File open}); }}Copy the code