A stream is an ordered set of byte data transfers with a starting point and an ending point. It does not care about the overall content of the file, but only whether the data is read from the file and what happens to it once it is read. A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams.
Today we will learn this piece of knowledge, how to learn, write source code bai. This article focuses on implementing a readable stream. Implementing a writable stream does not cover the basic API and usage of streams.
If you’re not familiar with it, I suggest you step over here
1. Pre-knowledge
Before we can learn about flow, we need to master the event mechanism. Events module is used in the source code, you can also write a, as long as there are publish, subscribe these two API can be. Publish and subscribe mechanism can complete the information interaction function, but also decouple modules. This mechanism can be found in many sources, such as the event flow in WebPack source code, and the MVVM pattern in VUE also uses a publish-subscribe mechanism.
2. Readable streams
A Readable stream is an object that implements the Stream.Readable interface, which reads object data as stream data. How to create a readable stream, very simple, look at the following code
// Import the fs module
let fs = require('fs');
// Call API to get a readable stream rs. MSG. Text is a file
let rs = fs.createReadStream('./msg.txt');
Copy the code
Rs is a readable stream with methods and events on it. Such as:
rs.pause();
rs.resume();
rs.on('data'.function () {})...Copy the code
The methods and events on the RS readable stream that we will implement in a moment are just a quick reminder.
2-1. Two modes for readable streams
The 2-1-1,flowing
model
When the readable stream was in flowing mode, it automatically read data from the underlying system and provided the data to the application as quickly as possible through events from the EventEmitter interface. That is, when we listen for a data event in a readable stream, the underlying interface starts reading and firing data events until the data is read. Look at the following code:
let fs = require('fs');
let rs = fs.createReadStream('./msg.txt');
// As long as you listen for data events, the underlying interface will read the data and constantly trigger the callback function to return the data
rs.on('data'.function (data) {})Copy the code
So how do you switch the current readable stream to flowing mode? There are three ways to switch to Flowing mode:
- Listening to the
data
The event - call
stream.resume()
methods - call
stream.pipe()
Method to send data toWritable
Note: If Readable switches to flowing mode and there is no consumer processing the data in the flow, it will be lost. This can happen, for example, if the readable.resume() method is called without listening for ‘data’ events, or if listening for ‘data’ events is disabled
The 2-1-2,paused
model
When a readable stream is in paused mode, the rs.read() method must be explicitly called to read a piece of data from the stream. Look at the following code:
let fs = require('fs');
let rs = fs.createReadStream('./msg.txt');
rs.on('readable'.function () {
let result = rs.read(5);
})
Copy the code
When listening for a readable event, the underlying interface will read the data and fill the cache, then pause the reading until the data in the cache is consumed. Let result = rs.read(5); That’s five pieces of data consumed.
To switch a readable stream to Paused mode, you can do the following:
- If no pipe target exists (
pipe destination
), can be calledrs.pause()
Method implementation. - If there is a pipeline target, you can cancel it
data
Event listens and callsrs.unpipe()
Method to remove all pipe targets.
2-2. Implement a readable stream
In the source code, fs.createreadStream () is an instance of ReadStream, which inherits the Stream.readable interface. Excerpt part of the source code, easy for us to understand.
const { Readable, Writable } = require('stream');
function ReadStream() { }
util.inherits(ReadStream, Readable);
fs.createReadStream = function (path, options) {
return new ReadStream(path, options);
};
Copy the code
Knowing the relationship between these classes, let’s start implementing our own ReadStream class.
The 2-2-1,flowing
Implementation of patterns
In this mode, what we need to do is, when the readable stream listens for the data event, it starts reading the data, and it keeps firing the data event and sending the data back. Let’s draw the skeleton of the ReadStream class, as follows
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
constructor(path, options) {
this.path = path;
this.flowing = false; . } read() { } open() { } end() { } destroy() { } pipe() { } pause() { } resume() { } }Copy the code
There are many parameters attached to this, which are listed separately:
attribute | role |
---|---|
path | Record the path to the file to read |
fd | File descriptor |
flowing | The signature of flowing mode |
encoding | coding |
flag | File operation permission |
mode | File mode, default is 0O666 |
start | Start reading position, default is 0 |
pos | Current reading position |
end | End read position |
highWaterMark | Maximum water level, 64 x 1024 by default |
buffer | Data storage area |
autoClose | Automatically shut down |
length | Length of data storage area |
The constructor should also have these sections:
this.on('newListener', (type, listener) => {
if (type === 'data') {
this.flowing = true;
this.read(); }});this.on('end', () = > {if (this.autoClose) {
this.destroy();; }});this.open();
Copy the code
Focusing on the first listening event, which implements, as soon as the user listens for the data event, we start calling this.read(), which reads the data.
Next, we write the main read() method, which reads data and emits data events. It relies on one method, fs.read(), for those unfamiliar, click here
read() {
// What happens when the file descriptor does not return
if (typeof this.fd ! = ='number') {
return this.once('open', () = >this.read())
}
// Handle boundary values
let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
// Start reading data
fs.read(this.fd, this.buffer, 0, n, this.pos, (err, bytesRead) => {
if (err) return;
if (bytesRead) {
let data = this.buffer.slice(0, bytesRead);
data = this.encoding ? data.toString(this.encoding) : data;
// Emit an event that returns the read data.
this.emit('data', data);
this.pos += bytesRead;
if (this.end && this.pos > this.end) {
return this.emit('end');
}
// Under flowing mode, data was read incessantly
if (this.flowing) {
this.read(); }}else {
this.emit('end'); }})}Copy the code
Implement the open method, which gets the file descriptor. Relatively simple
open() {
// Open the file
fs.open(this.path, this.flag, this.mode, (err, fd) => {
// If the file fails to open, an error event is emitted
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err); }}// Get the file descriptor
this.fd = fd;
this.emit('open', fd); })}Copy the code
Implement the PIPE method, which looks like this:
- Listening to the
data
Event, get the data - Writes data to a writable stream and pauses writing when the cache is full. If not, write is resumed
- After writing data, trigger
end
The event
pipe(des) {
// Listen to the data event and get the data
this.on('data', (data) => {
// If flag is true, the cache is not full and data can be written.
let flag = des.write(data);
if(! flag) {this.pause(); }});The drain event indicates that all data has been written to the cache and data can be read again
des.on('drain', () = > {this.resume();
});
this.on('end', () => { des.end(); })}Copy the code
Other methods are simpler to implement.
end() {
if (this.autoClose) {
this.destroy();
}
}
destroy() {
fs.close(this.fd, () => {
this.emit('close');
})
}
pause() {
this.flowing = fasle;
}
resume() {
this.flowing = true;
this.read();
}
Copy the code
At this point, a flowing mode readable stream was implemented.
The 2-2-2,paused
Implementation of patterns
The difference between a Stephing-mode readable stream and a flowing one was that, when the flow was in Paused mode, the underlying interface would not read the data and return it in one go; it would fill the cache and then stop reading, only reading again when the cache was empty or below the favored water threshold
Paused mode, where we focus on the implementation of the read() method, instead of reading the data as quickly as possible and sending it back to the consumer by triggering a data event. Instead, when the user listens for readable events, we fill the cache and then no more data is read. Until the data in the cache is consumed and less than the highWaterMark, the cache is filled with data, and so on until all data is read. This mode enables the process of reading data to be controlled and read on demand.
Let’s see how the read() method is implemented.
read(n){
let ret;
// Boundary value detection
if (n > 0 && n < this.length) {
// Create buffer, the value returned by the read method
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null! = (b =this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
// Retrieve the data to consume
ret[index++] = b[i];
if (index === ret.length) {
this.length -= n;
b = b.slice(i + 1);
// Put unconsumed data back into the cache
this.buffers.unshift(b);
break; }}}// Handle coding issues
if (this.encoding) {
ret = ret.toString(this.encoding); }}// When the cache size is smaller than the highWaterMark, read the data and fill the cache
if (this.length === 0| | -this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
Copy the code
Here, I’ve posted the main code for you to look at, just to throw some light on it. The read() method mainly operates on the cache, while the _read() method actually reads data from the file. Take a look at the _read() method.
let _read = (a)= > {
let m = this.end ? Math.min(this.end - this.pos + 1.this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) return
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false; }}}else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end'); }})}Copy the code
At this point, the readable stream pattern of Paused mode is complete.
3. Writable streams
An object that implements the stream.Writable interface to write stream data to the object. Writable streams are simpler than readable streams. The main writable streams are write(), _write(), and clearBuffer().
3-1. Implement a writable stream
Implementation of the write() method
write(chunk, encoding, cb) {
// Determine the parameters
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
// Process incoming data
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
let len = chunk.length;
this.length += len;
let ret = this.length < this.highWaterMark;
// Add a new task to the task queue while data is being written
if (this.writing) {
this.buffers.push({
chunk,
encoding,
cb
})
// Write data
} else {
this.writing = true;
this._write(chunk, encoding, this.clearBuffer.bind(this));
}
return ret;
}
Copy the code
Implementation of the _write() method
The main function of the _write() method is to call the underlying API to write data to a file
_write(chunk, encoding, cb) {
// The file descriptor is not available
if (typeof this.fd ! = ='number') {
return this.once('open', () = >this._write(chunk, encoding, cb));
}
// Write data, execute callback function
fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
if (err) {
if (this.autoClose) {
this.destroy();
}
return this.emit('error', err);
}
this.length -= written;
// Update the variable from which to write data next after it is written
this.pos += written;
// Execute the callback functioncb && cb(); })}Copy the code
The implementation of the clearBuffer() method
clearBuffer(cb) {
// Retrieve a task from the task queue
let data = this.buffers.shift();
// If the task has a value, the data is written to the file
if (data) {
this._write(data.chunk, data.encoding, this.clearBuffer.bind(this));
} else {
this.writing = false;
this.emit('drain'); }}Copy the code
At this point, a writable stream is implemented.
4. Duplex flow
Duplex is a stream that implements both Readable and Writable interfaces. With duplex flow, we can implement both readable and writable interfaces on the same object, as if inheriting both interfaces. Importantly, the readability and writability operations of a duplex stream are completely independent of each other. This is simply combining two features into one object.
const {Duplex} = require('stream');
const inoutStream = new Duplex({
// Implement a write method
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
// Implement a read method
read(size) {
this.push((++this.index)+' ');
if (this.index > 3) {
this.push(null); }}});Copy the code
5. Transform the flow
For the transformation flow, we don’t have to implement a read or write method, we just need to implement a transform method that combines the two. It stands for write method, and we can also use it to push data.
const {Transform} = require('stream');
const upperCase = new Transform({
// Implement a transform method
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase()); callback(); }}); process.stdin.pipe(upperCase).pipe(process.stdout);Copy the code