Node.js stream, this is the right way to open!
As the saying goes, “Man struggles upwards; water flows downwards”; As the old saying goes, “Falling flowers are intentional, flowing water is heartless.” (Watermelon eaters: What? What the hell are you doing?! 2 battalion chief 📣) oh 🤣, everybody big guy, this trifle need not disturb 2 battalion chief’s Italian gun, entered wrong channel, immediately begin topic!
(1) What exactly is flow?
Node.js file system (fs core module) should be used a lot in our development, if someone asks me what the hell a Node.js stream is, before learning more about it. My face must have looked like this:
What exactly is flow? How am I supposed to say anything without a hint? This kind of problem, oh skull pain! It seems that I can only go to Google (hum ~ MY program ape has quit Baidu).
Turned over the article that several big guy write, cannot say to solve confusion completely, also be harvest full 🙃! But, still feel that it has a little bit abstract, and a little bit difficult to understand! Think over the pain, decided to put the understanding of the big guys in the small book down slowly understand. (Mommy won’t have to worry about me being asked again.)
A stream is a collection of data -- just like an array or string
. The difference is that the data in the stream may not all be available at once, and there is no need to put it all into memory at once. This makes streams very powerful when they manipulate large amounts of data or data from an external source at once.
From node.js Streams: Everything You Need to know
A stream is an ordered set of byte data transfers with a starting and ending point.
When exchanging and transmitting data between various objects in the application program, the data contained in the object is always converted into various forms of stream data (that is, byte data), and then converted into usable data in the object after reaching the destination object through stream transmission.
From: The Definitive Guide to Node.js
(2) The types of streams in Node.js
Now that we know what a stream is, we also need to look at the types of streams in Node.js. Streams are a very abstract interface, but are implemented by many objects in Node.js. For example, HTTP server request and Response objects are streams. Node.js has four basic stream types:
- Readable: Readable stream. Such as in node.js file system (FS)
fs.createReadStream(path,options)
Is an example of a readable stream; - Writable: Writable stream. Such as in node.js file system (FS)
fs.createWriteStream(path,options)
Is an example of a writable stream; - Duplex: Indicates a flow that can be read or written. Such as the Net.socket class in Node.js network (NET);
- Transform: A Duplex flow, also known as a Transform flow, in which data can be modified and transformed during reading and writing. For example, zlib.createDeflate(options) in node.js compression (zlib).
In Node.js, all stream implementations inherit the EventEmitter class, which can trigger events while reading or writing data.
CreateReadStream and createWriteStream
Now that we know what flows are and what types they are, how are flows implemented in Node.js? Fs.createreadstream (path,options) creates a readable stream, fs.createWritestream (path,options) creates a writable stream, where path is the path to read the file and options is the configuration parameter. (This configuration parameter is also a bit more, 🤔 try to remember…..)
flags
: Readable streams default to ‘r’, writable streams default to ‘w’;encoding
: Encoding format. Readable streams default to NULL, writable streams default to ‘utF8’;autoClose
: Indicates whether to automatically switch off. The default is true.mode
: Read and write modes. The default is 0O666 (read-write, octal);highWaterMark
: highest water level. The default value for readable streams is 64 KB (maximum number of bytes read each time), and the default value for writable streams is 16 KB (maximum number of bytes written each time, also known as the maximum memory occupied).start
: Position from which reading or writing begins. The default starts at 0 (in bytes);fd
: File identifier. Is of type Numberend
: is unique to the readable stream, the final position from which the file is read, Infinity by default (in bytes).
In fact, there are many parameters, but they are easy to understand. If start is set to 0 and end is set to 5, the result will be 6 characters. (A little bossy 😎)
Fs.createreadstream (Path,options) and fs.createWritestream (path,options) are two simple methods to create a readable stream in Node.js. No, No, No! Not only do we need to know how to use it, but how do they do it? To get a feel for its use, create a readable stream:
// 'a.txt'Store ten numbers --> 1234567890let rs = fs.createReadStream('a.txt', {
flags: 'r',
encoding: 'utf8',
autoClose: true,
mode: 0o666,
start: 0,
end:5,
highWaterMark: 2
});
rs.on('data'.function (data) {
console.log(data);
rs.pause();
});
setInterval(() => { rs.resume(); }, 1000); // The final output is 01, 23, 45Copy the code
After listening for the ‘data’ event, the stream switches to flow mode and the data is read as fast as possible. The pause and resume events are used to pause and resume events that trigger the ‘data’ event (meaning reading of the file has stopped). You can also listen for ‘end’ events, ‘open’ events, ‘close’ events, and ‘error’ events.
In contrast to readable streams, we should also know the following characteristics of writable streams:
- Writable streams do exist
For the first time, the concept of a cache will actually be written to a file, and the rest will be written to the cache first
; - Writable stream returns one when written
Boolean type
When false is returned, stop writing. (If false is returned, write operations will still be written to the file, because the excess will be stored in the cache, which may cause memory waste); - Triggers when both writing and the cache are exhausted
Drain the event tapping
.
/ / if'a.txt'Contains content that will be overwritten by what was writtenlet ws = fs.createWriteStream('a.txt',{
flags: 'w',
mode: 0o666,
encoding: 'utf8',
autoClose: true,
start: 0,
highWaterMark: 3
})
let flag = ws.write('1');
console.log(flag); // true
flag = ws.write('1');
console.log(flag); // true
flag = ws.write('1');
console.log(flag); // false
Copy the code
In the example above, the last return is false, which is related to the highWaterMark. Flase is returned when something is written greater than or equal to highWaterMark.
So how do we control the timing of writes so that we don’t waste memory? Take a look at the following example.
// Reuse the writable stream instance from the previous example, using only three bytes of memory when writinglet i = 0;
function write(){// Write three bytes at a time, then stop and continue writinglet flag = true;
while(i < 9 && flag){
flag = ws.write(i + ' ');
i++
}
}
ws.on('drain'.function(){// Reach highWaterMark to trigger console.log()'Write succeeded'); write(); }) write(); // a.txt file --> 012345678Copy the code
Implement createReadStream and createWriteStream
The usage and characteristics of createReadStream and createWriteStream have been outlined above, but if we can implement readable and writable streams ourselves, we will understand them better. Fs.createreadstream () returns an instance of the ReadStream class. The writable stream does the same thing:
fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options);
};
Copy the code
All of a sudden it feels a lot easier if we can just wrap the ReadStream and WriteStream classes. For a better understanding, basically every line of code has a comment oh 😊! To save space, only the core read method and write method implementation is posted here. Please go to stream to download all the code:
class ReadStream extends EventEmitter{
read(){// Read the fileif(this.finished) {// I finished reading it and stopped reading itreturn; } // open Open file is asynchronous, when we read the file may not be openif(typeof this.fd ! = ='number'){
this.once('open',()=>this.read());
return; } // length represents the number of bytes read each timelet length = this.end ? Math.min(this.highWaterMark, this.end - this.pos + 1) : this.highWaterMark;
fs.read(this.fd,this.buffer,0,length,this.pos,(err,bytesRead)=>{
if(err){
this.emit('error',err);
this.destroy();
return;
}
if(bytesRead > 0){this.pos += bytesRead;letres = this.buffer.slice(0, bytesRead); // The actual read bytesRead may not fill this.buffer, so it is necessary to intercept and retain useful res = this.encoding? res.toString(this.encoding) : res; this.emit('data', res);
if(this.flowing) {// Continue the call if it is flowingreadThe this.read() method reads this.read(); }}else {
this.finished = true; // This. Emit ('end'); this.destroy(); }}}})Copy the code
ReadStream encapsulates the ReadStream class. The most important thing is to understand the implementation of the read method. The other methods are easier to understand. The most difficult thing to understand in the read method is the length variable (the number of bytes to read), because at the end of the read, there might be fewer bytes in the file than the highWaterMark highWaterMark, so math.min () is the minimum. For example: if this.end = 4; This. HighWaterMark = 3; This. Pos = 3 after the first read. At this point, two more bytes need to be read.
Class WriteStream extends EventEmitter {// chunk: write content; Encoding: Encoding format. The callback: Write (chunk,encoding=this.encoding,callback){// The method called while writing // for consistency, IsBuffer (chunk) = buffer. IsBuffer (chunk)? chunk : Buffer.from(chunk,encoding); this.len += chunk.length; // Maintain the cache lengthlet ret = this.highWaterMark > this.len;
if(! ret){ this.needDrain =true; // the drain event needs to be emitted}if(this.writing){ // trueThis.buffer. push({chunk, encoding, callback}); }else{// first write this.writing =true; this._write(chunk,encoding,()=>this.clearBuffer()); // Implement a write method}returnret; The return value of write must betrue/false} _write(chunk,encoding,callback){// Because the write method is called synchronously, the fd may not get it yetif(typeof this.fd ! = ='number'){// Determine if the file is not already openreturn this.once('open',()=>this._write(chunk,encoding,callback)); } // Parameter: fd file descriptor; Chunk is data; 0: the position at which the buffer is written. Chunk. length Specifies the number of bytes written. This. The location of the pos file began to write data offset fs. Write (enclosing fd, chunk, 0, the chunk. The length, enclosing pos, (err, bytesWritten) = > {this. Pos + = bytesWritten; this.len -= bytesWritten; // After each write, callback() is reduced in memory accordingly; })}clearBuffer(){// Clear the cachelet buf = this.buffer.shift();
if(buf){
this._write(buf.chunk,buf.encoding,()=>this.clearBuffer());
}else{
if(this.needdrain){drain this.writing =false;
this.needDrain = false; // Trigger drain againfalseThis.emit ()'drain'); }}}}Copy the code
The primary writable stream is the write method, which relies on the _write method and the clearBuffer method. All code please go to stream download, better understand oh b (~ ▽ ~) d!
(4) PIPE method
All of this is to highlight the importance of pipe method diversion. How to use the PIPE method:
let fs = require('fs');
let rs = fs.createReadStream('a.txt', {
highWaterMark: 4
});
let ws = fs.createWriteStream('b.txt', {
highWaterMark: 1
});
rs.pipe(ws);
Copy the code
The usage is not very simple, very direct ah! It’s only one line of code, but that’s the magic of it. (What about high energy? Little benches are ready, you tell me so). How do we implement a pipe method? Pipe is easy to implement based on the above encapsulation of createReadStream and createWriteStream. Encapsulate the PIPE method on the prototype of the ReadStream class as follows:
class ReadStream extends EventEmitter{
pipe(dest){
this.on('data',(data)=>{
let flag = dest.write(data);
if(! flag){ this.pause(); }}); dest.on('drain',()=>{ this.resume(); }}})Copy the code
Once again, for a better understanding, you can go here to stream to download all the code oh (~ ▽ ~)~*!
- The pipe method, also known as the pipe method, has the greatest advantage of controlling the rate (preventing flooding of available memory);
- Pipe method implementation principle: readable stream instances
rs
Listening to theOn (' data ') method
, and calls the read contentWs. Write method
(WS is a writable stream instance), whose method returns oneBoolean type
, if returnFalse calls rs.pause() to pause reading
, waiting for the writable stream to finish writingWs.on ('drain') is resuming reading
.
Conclusion: This is the end of node.js, thank you for reading! If there is a problem welcome to point out, common progress oh! If you feel that the article is a little difficult to understand, you can first collect, convenient for later reading oh ❤️!
Reference article:
- Node.js Streams: Everything you need to know
- The Definitive Guide to Node.js
- Node. Js Chinese website