What is flow?
What are the common application scenarios for streams?
What is the implementation mechanism for streams?
What is flow?
What is flow? I drew a simplified sewage treatment process to show my understanding of convection (forgive my poor imagination and drawing skills).
This is my understanding of how GULp works when I first used it. The sewage comes in through the inlet, and through a series of treatments in the sewage processor, the water eventually flows out of the outlet.
Going back to the code, using gulp, I type in a less file, gulp uses less, auto-completion, compression, etc., to output the desired CSS file. Do you have a one-stop service?
In the course of learning the flow, I also learned about an interesting model — the producer/consumer model. Once you understand the model, there is little pressure to understand the flow.
Take an example from life to help us understand the producer/consumer model.
The producer, like the factory in life, constantly generates instant noodles.
Consumers, like the masses of the people, need to buy instant noodles to continue their lives.
If I go directly to the factory to buy instant noodles, although it will be very cheap, but the retail will lose money, so they will only wholesale to me, which is enough to wipe out my small Treasury.
As for the factory, if it faces the buyers directly, the factory will produce instant noodles for one day and stop production for a month, because it needs to sell instant noodles for the next month. Soon, the factory will go bankrupt.
Since the factory cannot connect with consumers directly, a third party — supermarket can be introduced as the agent of the factory. The factory can then focus on producing instant noodles, which are sold directly to supermarkets. And consumers do not have to go to the factory wholesale, although a little more expensive, but free ah. Want to eat, go to the supermarket to buy a bag.
Through the above two reality chestnut, macro understanding of a stream.
Three roles: producer, consumer and third party intermediary
The third party intermediary at the bottom dotted line is the same as the third party intermediary at the top. The reason for the separation is to highlight the decoupling of consumers and producers through third-party intermediaries.
Some application scenarios for streams
Node.js has four basic stream types:
- Readable – Readable stream
- Writable – Writable stream
- Duplex — A flow that can be read or written. Also known as a Duplex flow
- Transform — A special Duplex stream that modifies and transforms data during reading and writing
Many of the built-in/core modules in Node.js are implemented based on streams:
This figure shows the status of streams in Node.js. It can be said that understanding flows will greatly help us understand and use the built-in modules listed above.
The implementation mechanism for streams
The best way to understand the internals of streams is to look at the source code for Node.js as well as the official node.js documentation:
- _stream_readable.js
- _stream_writable.js
- _stream_duplex.js
- _stream_transform.js
As we can see from the source code, these four basic types of streams are all abstractions of streams for developers to extend, so the source code seems to have some basis in use. Next, I will take a different approach to implementing the main logic of FS Readable Stream without relying on these four basic types of streams to gain a better understanding of convection
fs Readable Stream
Readable Stream has two modes:
- Flowing: In this mode, data is fetched and exported as quickly as possible. So if there is no event listener and no pipe() to direct the flow of data, data can be lost.
- Paused: default mode. In this mode, the call needs to be made manually
stream.read(..)
To get the data.
Switching to flowing mode can be done in the following ways:
- add
'data'
Event listener - call
stream.resume(..)
methods - call
stream.pipe()
Method to send data to the consumerWritable
You can switch to Paused mode in the following ways:
- If not called
stream.pipe(..)
The callstream.pause(..)
Can be - If there is a call
stream.pipe(..)
, then you need to passstream.unpipe(..)
Remove all pipes
The following points need special attention:
- Only provide consumers to consume data, for example, add
'data'
Event listeners, the readable stream will produce data. - If you remove
'data'
Event listeners will not automatically stop the stream. - If it’s called
stream.pipe(..)
Call again,stream.pause()
, will not stop the stream. - If the readable stream is switched to stream mode, but is not added
'data'
Event listeners, then the data will be lost. For example, callstream.resume()
, but did not add'data'
Event listeners, or'data'
The event listener is removed. - Choose a way to consume the data produced by the readable stream. Is recommended
stream.pipe(..)
. Also can use more controllable event mechanism, and then cooperatereadable.pause()/readable.resume()
APIs - if
readable
anddata
Is used at the same time, soreadable
The priority of events is greater thandata
Event is high. At this point, it must bereadable
Display the call within the eventstream.read(..)
In order to read the data.
See stream_readable_streams for more API usage
Flowing mode
Let’s try to implement the file-readable flow mode and take a look at its internal mechanism.
/ / source see https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');
// Use the node.js internal tools module to make the file readable stream inherit many of the event methods of the event
// Streams are implemented based on event mechanisms
util.inherits(FsReadableStream, EventEmitter);
// Declare a file-readable stream constructor and initialize the parameters
function FsReadableStream(path, options) {
const self = this; // Prevent the this pointer from pointing out improperly
// In order to illustrate the implementation process, we omit the boundary of the parameter.
self.path = path;
// The parameter to open the file
/ / see https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
self.flags = options.flags || 'r';
self.mode = options.mode || 0o66;
// Start and end positions for reading file contents
self.start = options.start || 0;
self.end = options.end;
// The water mark of each read, i.e. one read, maximum cache
self.highWaterMark = options.highWaterMark || 64 * 1024;
// Whether to automatically close the file after reading the content
self.autoClose = options.autoClose === undefined ? true : options.autoClose;
// Which encoding will be used to decode the final output data
self.encoding = options.encoding || 'utf8';
// A valid descriptor for the file
self.fd = null;
// Real time start position for file reading. The first time it starts at 0, and the second time it starts at 0 + the length of the first read
self.pos = self.start;
// Apply water level space as buffer buffer
self.buffer = Buffer.alloc(self.highWaterMark);
// The new constructor opens the file in preparation for the next step
self.open();
// Mode starts with pause mode
self.flowing = null;
// Once a new event is listened on, and is the 'data' event,
// Switch mode to flow mode and read data
self.on('newListener'.function (eventName) {
if (eventName === 'data') {
self.flowing = true; self.read(); }}); }// Before operating on a file, you need to open the file to get a valid descriptor for the file
FsReadableStream.prototype.open = function () {
const self = this;
fs.open(self.path, self.flags, self.mode, function (err, fd) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
self.fd = fd;
self.emit('open', fd);
});
};
// Read the contents of the file
FsReadableStream.prototype.read = function () {
const self = this;
// self.open() is an asynchronous method, in which case the file is checked to see if it is open
if (typeofself.fd ! = ='number') {
// The file is not open, you can add the open event listener
self.once('open', self.read);
return;
}
// Calculate how much data needs to be read each time.
const howMuchToRead = self.end ? Math.min(self.highWaterMark, self.end - self.pos + 1) : self.highWaterMark;
fs.read(self.fd, self.buffer, 0, howMuchToRead, self.pos, function (err, bytesRead) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
if (bytesRead > 0) {
// Update the read location
self.pos = self.pos + bytesRead;
// It is possible to read something smaller than the size of the buffer, so it must be intercepted to prevent garbled characters
const data = self.encoding ? self.buffer.slice(0, bytesRead).toString(self.encoding) : self.buffer.slice(0, bytesRead);
self.emit('data', data);
// If the start position of the next read is larger than the end position, the reading is complete
if (self.pos > self.end) {
self.emit('end');
if(self.autoClose) { self.destroy(); }}// If it is still in flow mode, data will continue to be read
if(self.flowing) { self.read(); }}else {
// The contents of the file have been read
self.emit('end');
if(self.autoClose) { self.destroy(); }}}); };// Change flow mode to pause mode
FsReadableStream.prototype.pause = function () {
if (this.flowing ! = =false) {
this.flowing = false; }};// Change pause mode to flow mode
FsReadableStream.prototype.resume = function () {
// If resume is called directly, no data listener is added
// Data will be lost
if (!this.flowing) {
this.flowing = true;
this.read(); }};// Close the file
FsReadableStream.prototype.destroy = function () {
const self = this;
if (typeof self.fd === 'number') {
fs.close(self.fd, function (err) {
if (err) {
self.emit('error', err);
return;
}
self.fd = null;
self.emit('close');
});
return;
}
this.emit('close');
};
Copy the code
We’ll see that the flow pattern will generate data in a continuous stream until the data source is exhausted. Of course, you can also use stream.pause(..) /stream. Resume The control of this mode lies with the developer, who must be familiar with the operation mechanism of this mode and use it carefully, otherwise it is easy to be overwhelmed by consumers or lose data halfway.
Paused mode
Because flow mode and pause mode are mutually exclusive, the two modes are used to implement readable streams separately. In pause mode, we need to listen for another event — ‘readable’ — and display a call to stream.read(n) to read the data.
/ / source see https://github.com/nodejs/node/blob/master/lib/fs.js
// Pause mode for file-readable streams
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');
// Use the node.js internal tools module to make the file readable stream inherit many of the event methods of the event
// Streams are implemented based on event mechanisms
util.inherits(FsReadableStream, EventEmitter);
// Declare a file-readable stream constructor and initialize the parameters
function FsReadableStream(path, options) {
const self = this; // Prevent the this pointer from pointing out improperly
// In order to illustrate the implementation process, we omit the boundary of the parameter.
self.path = path;
// The parameter to open the file
/ / see https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
self.flags = options.flags || 'r';
self.mode = options.mode || 0o66;
// Start and end positions for reading file contents
self.start = options.start || 0;
self.end = options.end;
// The water mark of each read, i.e. one read, maximum cache
self.highWaterMark = options.highWaterMark || 64 * 1024;
// Whether to automatically close the file after reading the content
self.autoClose = options.autoClose === undefined ? true : options.autoClose;
// Which encoding will be used to decode the final output data
self.encoding = options.encoding || 'utf8';
// A valid descriptor for the file
self.fd = null;
// Real time start position for file reading. The first time it starts at 0, and the second time it starts at 0 + the length of the first read
self.pos = self.start;
[Buffer, Buffer, Buffer...]
self.buffers = [];
// The current cache length self.buffers. Length can only read how many elements there are in the array.
// Here is the sum of the length of each item of cache buffers
self.length = 0;
// Since reading files is asynchronous, a flag is required
// If the data is being read, the data is stored in the cache
this.reading = false;
// Whether the condition for sending 'readable' events is met
// The 'readable' event indicates that the stream has a new dynamic: either there is new data, or it is at the end of the stream.
// For the former, stream.read() returns usable data. For the latter, stream.read() returns null
this.emittedReadable = false;
// The new constructor opens the file in preparation for the next step
self.open();
// Once a new event is listened for, and is 'readable',
/ / open emittedReadable
self.on('newListener'.function (eventName) {
if (eventName === 'readable') { self.read(); }}); } FsReadableStream.prototype.read =function (n) {
const self = this;
let buffer = null;
// When the requested data length is greater than the cache size
if (n > self.length) {
// The size of the cache is too large
// The water level will be raised to accommodate this demand
// computeNewHighWaterMark is a method of raising the watermark in node source code
self.highWaterMark = computeNewHighWaterMark(n);
self.emitReadable = true;
self._read();
}
// When the requested data length is greater than 0 and less than or equal to the length of the cache
if (n > 0 && n <= self.length) {
// Apply Buffer memory first
buffer = Buffer.alloc(n);
let index = 0; // The number of cycles
let flag = true; // Control the tag of while
let b;
while (flag && (b = self.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
buffer[index++] = b[i]; / / assignment
if (n === index) {
let arr = b.slice(index);
if (arr.length) {
// Don't put it back into the cache
self.buffers.unshift(arr);
}
self.length = self.length - n;
flag = false; }}}}// If there is no data in the current cache
if (self.length === 0) {
self.emittedReadable = true;
}
// When the length of the cache is below the water mark, the data is generated and stored in the cache
if (self.length < self.highWaterMark) {
if(! self.reading) { self.reading =true; self._read(); }}// Return the read data
return buffer;
};
FsReadableStream.prototype._read = function () {
const self = this;
if (typeofself.fd ! = ='number') {
return self.once('open', self._read);
}
const buffer = Buffer.alloc(self.highWaterMark);
fs.read(self.fd, buffer, 0, self.highWaterMark, self.pos, function (err, bytesRead) {
if (bytesRead > 0) {
// The read content is placed in the cache by default
self.buffers.push(buffer.slice(0, bytesRead));
self.pos = self.pos + bytesRead; // Maintain the read index
self.length = self.length + bytesRead; // Maintain the size of the cache
self.reading = false; // The reading is complete
// Whether a readable event needs to be emitted
if (self.emittedReadable) {
self.emittedReadable = false; // The default is not triggered next time
self.emit('readable'); }}else {
self.emit('end');
if(self.autoClose) { self.destroy(); }}}); };// Before operating on a file, you need to open the file to get a valid descriptor for the file
FsReadableStream.prototype.open = function () {
const self = this;
fs.open(self.path, self.flags, self.mode, function (err, fd) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
self.fd = fd;
self.emit('open', fd);
});
};
// Close the file
FsReadableStream.prototype.destroy = function () {
const self = this;
if (typeof self.fd === 'number') {
fs.close(self.fd, function (err) {
if (err) {
self.emit('error', err);
return;
}
self.fd = null;
self.emit('close');
});
return;
}
this.emit('close');
};
Copy the code
In this mode, we will see that ‘readable’ events will tell us when we can retrieve the data. If you call stream.read(n) directly, it will cause fs.read(..) Asynchronous operation, data has not been read and put into cache, resulting in null result.
As long as the contents of the cache are consumed below the water mark, the cup will be automatically refilled and the data of the water mark size will be generated and put into the cache.
When will the ‘readable’ event be emitted? Triggered when the cache is empty and water line size data is produced after the cache. The next time this happens, the cache will be used up and the cup will be filled again.
Stream.read (n) pass in as much data as you want. Stream. length can be used to view the length of the current cache data and determine how much data to request. In actual scenarios, use stream._readablestate.length
fs Writable Stream
Fs Writable Stream has a similar mechanism to readable streams. Without further ado, first the code:
/ / source see https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');
// Use the node.js internal tools module to make the file writable stream inherit many of the event methods of the event
// Streams are implemented based on event mechanisms
util.inherits(FsWritableStream, EventEmitter);
// Declare a file-writable stream constructor and initialize the parameters
function FsWritableStream(path, options) {
const self = this; // Prevent the this pointer from pointing out improperly
// In order to illustrate the implementation process, we omit the boundary of the parameter.
self.path = path;
// The parameter to open the file
/ / see https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
self.flags = options.flags || 'r';
self.mode = options.mode || 0o66;
// Where the file content is written to start
self.start = options.start || 0;
// The maximum cache for each write
self.highWaterMark = options.highWaterMark || 64 * 1024;
// Whether to automatically close the file after the content is written
self.autoClose = options.autoClose === undefined ? true : options.autoClose;
// Tells the program how the written data will be decoded
self.encoding = options.encoding || 'utf8';
// A valid descriptor for the file
self.fd = null;
// Real time start position of file write. The first one starts at 0, and the second one is 0 + the length of the first one
self.pos = self.start;
[Buffer, Buffer, Buffer...]
self.buffers = [];
// The current cache length self.buffers. Length can only read how many elements there are in the array.
// Here is the sum of the length of each item of cache buffers
self.length = 0;
// Since reading files is asynchronous, a flag is required
// If the data is being read, the data is stored in the cache
this.writing = false;
// controls whether to notify the 'drain' event listener that the cache is empty from the full state
self.needDrain = false;
// The new constructor opens the file in preparation for the next step
self.open();
}
FsWriteStream.prototype.open = function () {
const self = this;
fs.open(self.path, self.flags, self.mode, function (err, fd) {
if (err) {
self.emit('error', err);
if (self.autoClose) {
self.destroy();
}
return;
}
self.fd = fd;
self.emit('open', fd);
});
};
FsWriteStream.prototype.destroy = function () {
const self = this;
if (typeof self.fd === 'number') {
fs.close(self.fd, function (err) {
if (err) {
self.emit('error', err);
return;
}
self.emit('close');
});
} else {
self.emit('close'); }};// Initiate the call. By default, all three parameters are passed
FsWriteStream.prototype.write = function (chunk, encoding, callback) {
const self = this;
const bufferChunk = Buffer.isBuffer(chunk) && chunk || Buffer.from(chunk, encoding);
self.length = self.length + bufferChunk.length;
// If the current cache length is less than the watermark, the cache is not full
const ret = self.length < self.highWaterMark;
// When the cache is full, switch on notification to the drain eventself.needDrain = ! ret;if (self.writing) {
// Data is being written/consumed, so it is stored in cache first
self.buffers.push({
chunk,
encoding,
callback,
});
} else {
// This is equivalent to sending ajax
self.writing = true;
self._write(chunk, encoding, function () {
callback();
// Clear the cache when the write is complete
self.clearBuffer();
});
}
// Each time write is called, it returns whether the current cache is full
return ret;
};
FsWriteStream.prototype._write = function (chunk, encoding, callback) {
const self = this;
if (typeofself.fd ! = ='number') {
self.once('open'.function () {
self._write(chunk, encoding, callback);
});
return;
}
fs.write(self.fd, chunk, 0, chunk.length, self.pos, function (err, writtenBytes) {
if (err) {
self.emit('error', err);
self.writing = null;
if (self.autoClose) {
self.destroy();
}
return false;
}
// Length subtracts the length of data that has been successfully consumed
self.length = self.length - writtenBytes;
// Update the starting position of the next write
self.pos = self.pos + writtenBytes;
callback();
});
};
FsWriteStream.prototype.clearBuffer = function () {
const self = this;
const buffer = self.buffers.shift();
if (buffer) {
// If there is still data in the cache, continue to consume data
self._write(buffer.chunk, buffer.encoding, function () {
buffer.callback();
self.clearBuffer();
});
} else {
// If the cache is empty, reset the write state
self.writing = false;
if (self.needDrain) {
// Send the drain event to indicate that the cache is exhausted and ready for the next wave of data writes
self.needDrain = false;
self.emit('drain'); }}};Copy the code
We will see that when the data stream comes in, the writable stream will directly consume the data. When consuming/writing files is too slow, the data flow is cached in the cache.
“Back pressure” (fs.write(..)) occurs when producers send data too fast and fill up the cache. When the cache is exhausted, the writable stream sends a drain message to the producer to resume production.
conclusion
The above FS Readable Stream and FS Writable Stream are the upper apis for the base Stream types, Readable Stream and Writable Stream, respectively. Fs source code, in fact, is respectively inherited these two basic stream types plus some FS file operations, finally extended into a file stream.
So flow is a producer/consumer model based on events and state machines.
For more information on the use of streams, see the official website
reference
- Have an in-depth understanding of the internal mechanics of Node.js Stream
- Node.js streams: Everything you need to know
- The Node of the stream
- Node. Js’s official website
- Node. Js source code