The concept of flow
- A stream is an ordered set of byte data transfers with a starting and ending point
- Streams don’t care about the overall content of the file, only whether data is read from the file and what happens to it once it is read
- A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams
- Streams are the core module of Node.js and are basically instances of stream, such as process.stdout and http.clientRequest
The benefits of the flow
- Streams are event-based apis for managing and processing data, and are efficient
- With event and non-blocking I/O libraries, the flow module allows it to be processed dynamically when it is available and released when it is not needed
There are two modes of data in a stream, binary mode and object mode
- Binary mode, where each block is a buffer or string
- In object mode, a stream handles a series of common objects internally
All stream objects created using the Node.js API can only manipulate strings and Buffer objects. However, with some third-party stream implementations, you can still handle other types of JavaScript values (besides null, which has special meaning in stream processing). These streams are said to work in object mode. When creating instances of a stream, you can switch instances of a stream to objectMode with the objectMode option. It is not safe to try to switch an existing stream to object mode.
There are four basic stream types in Node.js
- Readable-readable stream (for example, fs.createreadStream ())
- Writable- Writable stream (for example fs.createWritestreame ())
- Duplex- a stream that is readable and writable (such as net.socket)
- Transform- A Duplex stream that can modify and Transform data during reading and writing (e.g. Zlib.createdeflate ())
The first type: createReadStream
Create a readable stream
// Introduce the fs (read file) modulelet fs = require('fs'); // Create a readable streamlet rs = fs.createReadStream('./1.txt',{
flags:'r',
encoding:'utf8',
start:0,
autoClose:true,
end: 3,
highWaterMark:3
});
Copy the code
API: createReadStream (path, [options]);
- Path is the path to read files
- The options there are
- Flags: What to do to open a file. Default is ‘r’.
- Encoding: The default value is null, which represents buffer
- Start: indicates the index position to start reading
- AutoClose: automatically closes after reading data
- End: the index position (including the end position) at which the reading ends
- HighWaterMark: Read cache default default size 64KB (64* 1024B)
If encoding is utF8, highWaterMark must be larger than 3 bytes
Some listening events for a readable stream
- Data event
- End event
- Error event
- Open event
- The close event
Each is written as follows:
// The stream switches to flow mode, and the data is read as fast as possible.'data'.function(data){// Pause mode -> Flow mode console.log(data); }); Rs.on (rs.on)'end'.function () {
console.log('Read complete'); }); Rs.on (rs.on)'error'.function(err) { console.log(err); }); // The file is opened and triggered rs.on('open'.function () {
console.log('File open'); }); Rs.on (rs.on)'close'.function () {
console.log('off');
});
Copy the code
Set the coding
Sets encoding as specified {encoding:’utf8′}
rs.setEncoding('utf8');
Copy the code
Pause and resume triggering data
Use pause() and resume()
rs.on('data'.function(data) { console.log(data); rs.pause(); // pause the data event});setTimeout(function() { rs.resume(); // Restore method},2000);Copy the code
The second type is createWriteStream
Create a writable stream
// Introduce the fs (read file) modulelet fs = require('fs'); // Create a writable streamlet ws = fs.createWriteStream('./1.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
Copy the code
API: createWriteStream (path, [options]);
- Path is the path to read files
- The options there are
- Flags: What to do to open a file, default is ‘w’
- Encoding: The default value is UTF8
- HighWaterMark: writes to the cache area. The default size is 16kb
Some methods for writable streams
1. Write method
ws.write(chunk, [encoding], [callback]);
Copy the code
- Chunk Data written buffer/string
- Encoding Encoding format. This parameter is optional when chunk is a string
- Callback Callback after a successful write
The return value is Boolean, false when the system cache is full, true when the system cache is not full
2. End method
ws.end(chunk, [encoding], [callback]);
Copy the code
Indicates that no data is to be written to Writable next. By passing the optional chunk and encoding arguments, one more piece of data can be written before closing the stream. If the optional callback function is passed, it will be used as a callback for the ‘Finish’ event
3. Drain method of tapping
ws.on('drain'.function(){
console.log('drain')});Copy the code
- When a stream is not in drain state, the call to write() caches the data block and returns false
- Drain occurs when all data blocks currently cached are full
- Once write() returns false, no data blocks can be written until the ‘drain’ event is emitted
4. The finish
ws.end('the end');
ws.on('finish'.function(){
console.log('drain')});Copy the code
- After the end method is called and the buffer data has been passed to the underlying system, the ‘Finish’ event is emitted
Third type: Read-write flows, also known as Duplex flows
A duplex stream can be both readable and writable on the same object, as if inheriting both interfaces. And reading can be fine (non-interference)
// Introduce the duplex flow modulelet {Duplex} = require('stream');
let d = Duplex({
read(){
this.push('hello'); this.push(null) }, write(chunk,encoding,callback){ console.log(chunk); callback(); }}); d.on('data'.function(data){
console.log(data);
});
d.write('hello');
Copy the code
Fourth type: Transform
- The conversion flow output is computed from the input
- Instead of implementing the read and write methods, you implement a transform method to combine the two.
// Introduce the conversion flowlet {Transform} = require('stream'); // The conversion stream takes the same parameters as the writable streamlettranform1 = Transform({ transform(chunk,encoding,callback){ this.push(chunk.toString().toUpperCase()); callback(); }});lettranform2 = Transform({ transform(chunk,encoding,callback){ console.log(chunk.toString()); callback(); }}); process.stdin.pipe(tranform1).pipe(tranform2);Copy the code
Pipe method
As you all know, to write Readable data to Writable, you need to manually read the data into memory and then write to Writable. This is the code that needs to be written every time data is passed:
readable.on('readable', (err) => {
if(err) throw err
writable.write(readable.read())
})
Copy the code
Node.js provides the pipe() method for ease of use
readable.pipe(writable)
Copy the code
Principle of pipe method
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data'.function (data) {
var flag = ws.write(data);
if(! flag) rs.pause(); }); ws.on('drain'.function () {
rs.resume();
});
rs.on('end'.function () {
ws.end();
});
Copy the code
Unpipe usage
- The readable.unpipe() method separates streams that were previously bound through the stream.pipe() method
- If destination is not passed in, all bound streams are separated
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('Close writing to 2.txt');
from.unpipe(writable);
console.log('Manually close file stream');
to.end();
}, 1000);
Copy the code
cork & uncork
- Calling writable.cork() forces all written data to be stored in an in-memory buffer. The data in the buffer is not output until the stream.uncork() or stream.end() method is called
- Writable.uncork () outputs all data buffered in memory after the stream.cork() method is called
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
Copy the code
readable
The ‘readable’ event will be emitted only when there is data available to read from the stream. In some cases, adding a callback for the ‘readable’ event will cause some data to be read into the internal cache
const readable = getReadableStreamSomehow();
readable.on('readable', () => {// some data is readable});Copy the code
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
start:3,
end:8,
encoding:'utf8',
highWaterMark:3
});
rs.on('readable'.function () {
console.log('readable');
console.log('rs._readableState.buffer.length',rs._readableState.length);
let d = rs.read(1);
console.log('rs._readableState.buffer.length',rs._readableState.length);
console.log(d);
setTimeout(()=>{
console.log('rs._readableState.buffer.length',rs._readableState.length);
},500)
});
Copy the code
- When the stream data reaches the tail, the ‘readable’ event will be emitted. The firing sequence precedes the ‘end’ event
- In fact, the ‘readable’ event indicates that the stream has a new dynamic: either there is new data, or it is at the end of the stream. For the former, stream.read() returns usable data. For the latter, stream.read() returns null.
Two modes of readable flow
- Readable streams work in two modes: flowing and Paused
- Under flowing mode, readable streams automatically read data from the underlying system and provided data to the application as quickly as possible through events from the EventEmitter interface
- In paused mode, the stream.read() method is called to read the data fragment from the stream
- Any Of the Readable streams that initially worked in Paused mode could switch to flowing mode in one of three ways
- Listen for the ‘data’ event
- Call the stream.resume() method
- Call the stream.pipe() method to send data to Writable
- A readable stream can be switched to paused mode by:
- If no pipe destination exists, it can be implemented by calling stream.pause().
- If there are pipe targets, this can be done by unlistening the ‘data’ event and calling the stream.unpipe() method to remove all pipe targets.
If Readable switches to flowing mode and there is no consumer to process the data in the flow, that data will be lost. This can happen, for example, if the readable.resume() method is called without listening for ‘data’ events, or if listening for ‘data’ events is disabled.
Three states of a readable stream
At any time, any readable stream should be in exactly one of the following three states:
- readable._readableState.flowing = null
- readable._readableState.flowing = false
- readable._readableState.flowing = true
-
If readable._readableState.flowing is null, the readable stream will not generate data because there is no data consumer. In this state, listening for the ‘data’ event, the readable.pipe() method is called, or the readable.resume() method is called, and the readable._readableState.flowing value will become true. At this point, the readable stream starts to fire events frequently as the data is generated.
-
Calling the readable.pause() method, readable.unpipe() method, or receiving “back pressure” will cause the readable._readableState.flowing value to become false. This will pause the flow of events, but not the data generation. In this case, setting listeners for the ‘data’ event does not cause readable._readableState. Flowing to true.
-
When the readable._readableState.flowing value is false, data may pile up into the internal cache of the stream.