What is flow?
As the name suggests, flow means flow of data.
Here’s an example:
For example, the water supply in your community has been cut off. Unexpectedly, you have stored some water in your tower-free water supply device, but there is no water next door. If you want to borrow some water from your home, you can transport it directly by bucket
This is similar to a request object making a request to a server for resources, where the request for resources is propagated through a stream
Why use streams?
Generally, there are two modes for processing data: buffer mode and stream mode
buffer
Mode: The operation is performed once the data is collectedstream
Mode: Operation while fetching data
Here’s an example:
If you want to watch a movie on your phone, buffer mode means you cache the entire movie and then watch it again. In Stream mode, you’re caching while you’re watching
Stream mode is superior to buffer mode both in space and time:
- Spatial: Memory takes up only the size of a data area that needs to be processed, not the entire file
- Time: Because you don’t need all the data to start processing, time is saved
Another advantage is that you can call it chained
If the speed of writing can’t keep up with the speed of reading, it may lead to loss of data So it should be the normal operation of write a paragraph, and then read the next paragraph, if there is no written, let read flow to suspend, write again, such as (that is, when you watch the movie, looked at, with a cache if your network is not good, no cache, you don’t see, such as cache point again, So in order to keep the speed of the readable and writable streams consistent, we need to use the essential property pipe of the stream. Pipe translates to pipe, just like the pipe in the example above
The type of flow
There are four basic flow types in Node:
- Readable Readable stream
- Writable Writable stream
- Duplex readable and writable flow
- Transform A Duplex stream that can modify and Transform data during reading and writing.
There are two modes of data in the stream:
- Binary mode, both
string
String andbuffer
. - In object mode, the stream internally deals with a system common object.
A Readable stream (Readable)
Two modes in readable streams:
- Flowing mode (FLOWING) : Data was automatically read from the underlying system and provided to the application, through events, as quickly as possible.
- Paused mode, which must be explicitly called
read()
Read data.
Readable streams all start in pause mode
1. Add the data event callback. 2. call resume(). 3. Call pipe().
1. If there is no pipeline target, call Pause (). 2. If there are pipe targets, remove all pipe targets and call unpipe() to remove multiple pipe targets.
The createReadStream method takes two arguments:
The first parameter is the path to read the file. The second parameter is the options option, which has eight parameters:
- Flags: indicates the flag bit. The default is R.
- Encoding: character encoding, null by default.
- Fd: file descriptor, null by default;
- Mode: permission bit. The default value is 0o666.
- AutoClose: indicates whether to automatically close files. The default value is true.
- Start: indicates the start position of reading files.
- End: Reads the file (including) end position;
- HighWaterMark: maximum number of bytes that can be read from a file. The default value is 64 x 1024.
Create a readable stream and listen for events:
const fs = require('fs'); // Create a file-readable streamlet rs = fs.createReadStream('./1.txt', {// file system flags:'r'Rs.setencoding (); // Data encoding, if this parameter is set, the read data will be automatically parsed // If not, the read data will be Buffer // Also can be set with rs.setencoding () encoding:'utf8'Fd: null, // File permission, mode: 0o666, // File read start: 0, // file read end: Infinity, // Read buffer size, default 64K highWaterMark: 3}); // Trigger rs.on when the file is opened.'open'.function () {
console.log('File open');
});
//监听data事件,会让当前流切换到流动模式
//当流中将数据传给消费者后触发
//由于我们在上面配置了 highWaterMark 为 3字节,所以下面会打印多次。
rs.on('data'.function(data) { console.log(data); }); // Trigger rs.on(when no data is available in the stream for the consumer'end'.function () {
console.log('Data read completed'); }); // Rs.on ('error'.function () {
console.log('Read error'); }); // Trigger rs.on when the file is closed'close'.function () {
console.log('File Closed');
});
Copy the code
Note:open
和 close
Events are not triggered by all streams.
When they listen for data events, the system reads and retrives data as quickly as possible. But sometimes, we need to pause a stream read and do something else.
Pause () and resume() are used.
const fs = require('fs'); // Create a file-readable streamlet rs = fs.createReadStream('./1.txt', {
highWaterMark: 3
});
rs.on('data'.function(data) {console.log(' read${data.length}Byte data:${data.toString()}`); // Stops the flow in flow mode from firing'data'Event, switch out of flow mode, data will remain in the internal cache. rs.pause(); // Wait for 3 seconds before resuming the trigger'data'Event to switch the stream back to flow mode.setTimeout(function () {
rs.resume();
}, 3000);
});
Copy the code
A readable event for a readable stream, which is emitted when there is data in the stream available for reading. Note that when listening for readable events, the stream will stop flowing and the read() method will be called to read the data. Note: Do not mix on(‘data’), on(‘readable’), and pipe(). This will result in ambiguous behavior.
const fs = require('fs');
let rs = fs.createReadStream('./1.txt', { highWaterMark: 1 }); Rs.on () when there is data in the stream to read'readable'.function () {
letdata; If there are not enough bytes to read, return the remaining data in the buffer. If there are no specified bytes, return all data in the bufferwhile(data = rs.read()) {console.log(' read to${data.length}Byte data '); console.log(data.toString()); }});Copy the code
Can write flow (Writable)
The createWriteStream method takes two arguments:
The first parameter is the path to read the file. The second parameter is the options option, which contains seven parameters:
- Flags: indicates the flag bit. The default value is W.
- Encoding: character encoding. The default value is UTF8.
- Fd: file descriptor, null by default;
- Mode: permission bit. The default value is 0o666.
- AutoClose: indicates whether to automatically close files. The default value is true.
- Start: indicates the start position of writing files.
- HighWaterMark: a marker to compare the number of bytes written. The default is 16 x 1024.
Create a writable stream and listen for events:
const fs = require('fs'); // Create a file writable streamlet ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); // Write data to the stream // Parameter one indicates the data to be written // Parameter two indicates the encoding mode // Parameter three indicates the callback that was successfully written // returns when the buffer is fullfalse, return when there is no moretrue. // Since we set the size of the buffer above to 3 bytes, it returns when the third buffer is writtenfalse. console.log(ws.write('1'.'utf8'));
console.log(ws.write('2'.'utf8'));
console.log(ws.write('3'.'utf8'));
console.log(ws.write('4'.'utf8'));
function writeData() {
let cnt = 9;
return function () {
let flag = true;
while (cnt && flag) {
flag = ws.write(`${cnt}`);
console.log('Number of bytes written to buffer', ws.writableLength); cnt--; }}; }letwd = writeData(); wd(); // Stop writing data when the buffer is full. // Stop writing data when the buffer is empty'drain'Event to tell the producer that it is ready to continue writing data. ws.on('drain'.function () {
console.log('I'm ready to write more data.');
console.log('Number of bytes written to buffer', ws.writableLength); wd(); }); Ws.on () is triggered when a stream or underlying resource is closed'close'.function () {
console.log('File closed'); }); // Ws.on ('error'.function () {
console.log('Write data error');
});
Copy the code
The end() method written to the stream and the Finish event listener
const fs = require('fs'); // Create a file writable streamlet ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); // Write data to the stream // Parameter one indicates the data to be written // Parameter two indicates the encoding mode // Parameter three indicates the callback that was successfully written // returns when the buffer is fullfalse, return when there is no moretrue. // Since we set the size of the buffer above to 3 bytes, it returns when the third buffer is writtenfalse. console.log(ws.write('1'.'utf8'));
console.log(ws.write('2'.'utf8'));
console.log(ws.write('3'.'utf8'));
console.log(ws.write('4'.'utf8')); // Calling end() indicates that there is no more data to be written, so write another piece of data before closing the stream. // if a callback function is passed, it will be used as'finish'The event callback function ws-end ('Last bit of data'.'utf8'); Ws.on () is triggered when end() is called and buffer data has been passed to the underlying system'finish'.function () {
console.log('Write done');
});
Copy the code
The cork() and uncork() methods write to the stream are designed to address performance degradation caused by internal buffers that may fail when large numbers of small pieces of data are written to the stream.
const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', { highWaterMark: 1 }); // After calling cork(), all written data is forced to be buffered into memory. // Data will not be written to the file because it exceeds the highWaterMark setting. ws.cork(); ws.write('1');
console.log(ws.writableLength);
ws.write('2');
console.log(ws.writableLength);
ws.write('3'); console.log(ws.writableLength); // Outputs the buffered data from the call to cork() to the target, that is, to a file. ws.uncork();Copy the code
Pay attention tocork()
The number of calls must be equal touncork()
Consistent.
const fs = require('fs');
let ws = fs.createWriteStream('./1.txt', { highWaterMark: 1 }); // Call cork() should write uncork() once, one to one. ws.cork(); ws.write('4');
ws.write('5');
ws.cork();
ws.write('6');
process.nextTick(function() {// Note that uncork() ws. Uncork (); // Only uncork() calls the same number of times will be output. ws.uncork(); });Copy the code
Duplex flow (Duplex)
Duplex streams are actually a class of streams that inherit both Readable and Writable. A Duplex object can be used as either a Readable stream or a Writable stream. We need to inherit the Duplex class: 1. Inherit the Duplex class 2. Implement _read(). 3. Implement _write()
With the _read() method implemented, data events can be listened for to consume data generated by Duplex. With the _write() method implemented, data can be consumed downstream
The implementation of read() and write() methods will be familiar, as they are the same as Readable and Writable.
const Duplex = require('stream').Duplex;
const myDuplex = new Duplex({
read(size) { // ... }, write(chunk, encoding, callback) { // ... }});Copy the code
The Duplex class can be instantiated with several parameters:
readableObjectMode
: Whether the readable stream is set toObjectMode
, the defaultfalse
writableObjectMode
: Whether writable streams are set toObjectMode
, the defaultfalse
allowHalfOpen
: the defaulttrue
To set upfalse
If so, the stream automatically terminates the reader when the writer terminates, and vice versa.
Example:
const Duplex = require('stream').Duplex;
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// The underlying source only deals with strings
if(Buffer.isBuffer(chunk)) chunk = chunk.toString(); this[kSource].writeSomeData(chunk); callback(); } _read(size) { this[kSource].fetchSomeData(size, (data, encoding) => { this.push(Buffer.from(data, encoding)); }); }}Copy the code
This is unexecutable pseudocode, but Duplex’s ability to produce and consume data puts it in the middle of the data flow pipeline.
Transformation flows (Transform)
In a Duplex flow, the data in a readable flow is separated from the data in a writable flow. The Transform flow is a special Duplex flow that inherits from the Duplex flow, and the data on the writable end is automatically added to the readable end after transformation
The Tranform class inherits Duplex internally and implements the writable.write() and readable._read() methods
So when we customize the Transform stream, we just need to: 1. Inherit Transform class 2. Implement _transform() 3. Implement _flush()
The _transform(chunk, encoding, callback) method is used to receive data and generate output. The argument is already familiar. Like Writable, chunk defaults to Buffer. Unless decodeStrings is set to false.
This.push (data) can be called inside the _transform() method to produce data and pass it to a writable stream, or not, meaning that input produces no output.
Callback (err, data) must be called when the data has been processed. The first argument is used to pass an error message, and the second argument can be omitted. If passed, it has the same effect as this.push(data)
transform.prototype._transform = function (data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data);
};
Copy the code
In some cases, the transform operation may need to write more data to the writable stream at the end of the stream. For example, the Zlib stream stores some internal state to optimize the compressed output. In this case, you can use the _flush() method, which is called before all the written data has been consumed, triggering end.
The Transform event
The Transform stream has two common events: 1. Finish from Writable 2. End from Readable
Finish is triggered when transform.end() is called and data is processed by _transform(). End is triggered when _flush is called and all data is output.
Pipe () method
The pipe() method is similar to the code below, connecting a pipe between a readable stream and a writable stream.
const fs = require('fs'); // Create a readable streamlet rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); // Create a writable streamlet ws = fs.createWriteStream('./2.txt', {
highWaterMark: 3
});
rs.on('data'.function (data) {
letflag = ws.write(data); Console. log(' writes to the writable stream${data.length}Byte data '); // If the write buffer is full, the read of the readable stream is suspendedif(! flag) { rs.pause(); console.log('Pause readable stream'); }}); // Monitor whether rs.on('end'.function () {
console.log('Data read out'); Ws.end (); ws.end(); ws.end(); ws.end(); }); // If the writable stream buffer has been cleared and can be written again, reopen the readable stream ws.on('drain'.function () {
rs.resume();
console.log('Restart readable stream');
});
Copy the code
Use the pipe() method to do the above.
const fs = require('fs'); // Create a readable streamlet rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); // Create a writable streamlet ws = fs.createWriteStream('./2.txt', {
highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', { highWaterMark: 3 }); // Bind a writable stream to a readable stream, automatically switch the readable stream to flow mode, and push all data from the readable stream to the writable stream. rs.pipe(ws); Rs.pipe (ws2);Copy the code
Unbinding writable streams can also be done manually with unpipe().
const fs = require('fs'); // Create a readable streamlet rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); // Create a writable streamlet ws = fs.createWriteStream('./2.txt', {
highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt', { highWaterMark: 3 }); rs.pipe(ws); rs.pipe(ws2); // Unbind writable streams, if arguments are not written, unbind all pipessetTimeout(function () {
rs.unpipe(ws2);
}, 0);
Copy the code
Ok, that’s it!!