A stream is an ordered set of byte data transfers with a starting point and an ending point. It does not care about the overall content of the file, but only whether the data is read from the file and what happens to it once it is read. A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams.

Today we will learn this piece of knowledge, how to learn, write source code bai. This article focuses on implementing a readable stream. Implementing a writable stream does not cover the basic API and usage of streams.

If you’re not familiar with it, I suggest you step over here

1. Pre-knowledge

Before we can learn about flow, we need to master the event mechanism. Events module is used in the source code, you can also write a, as long as there are publish, subscribe these two API can be. Publish and subscribe mechanism can complete the information interaction function, but also decouple modules. This mechanism can be found in many sources, such as the event flow in WebPack source code, and the MVVM pattern in VUE also uses a publish-subscribe mechanism.

2. Readable streams

A Readable stream is an object that implements the Stream.Readable interface, which reads object data as stream data. How to create a readable stream, very simple, look at the following code

// Import the fs module
let fs = require('fs'); 

// Call API to get a readable stream rs. MSG. Text is a file
let rs = fs.createReadStream('./msg.txt');
Copy the code

Rs is a readable stream with methods and events on it. Such as:

rs.pause();
rs.resume();
rs.on('data'.function () {})...Copy the code

The methods and events on the RS readable stream that we will implement in a moment are just a quick reminder.

2-1. Two modes for readable streams

The 2-1-1,flowingmodel

When the readable stream was in flowing mode, it automatically read data from the underlying system and provided the data to the application as quickly as possible through events from the EventEmitter interface. That is, when we listen for a data event in a readable stream, the underlying interface starts reading and firing data events until the data is read. Look at the following code:

let fs = require('fs');
let rs = fs.createReadStream('./msg.txt');
// As long as you listen for data events, the underlying interface will read the data and constantly trigger the callback function to return the data
rs.on('data'.function (data) {})Copy the code

So how do you switch the current readable stream to flowing mode? There are three ways to switch to Flowing mode:

  • Listening to thedataThe event
  • callstream.resume()methods
  • callstream.pipe()Method to send data toWritable

Note: If Readable switches to flowing mode and there is no consumer processing the data in the flow, it will be lost. This can happen, for example, if the readable.resume() method is called without listening for ‘data’ events, or if listening for ‘data’ events is disabled

The 2-1-2,pausedmodel

When a readable stream is in paused mode, the rs.read() method must be explicitly called to read a piece of data from the stream. Look at the following code:

let fs = require('fs');
let rs = fs.createReadStream('./msg.txt');
rs.on('readable'.function () {
    let result = rs.read(5);
})
Copy the code

When listening for a readable event, the underlying interface will read the data and fill the cache, then pause the reading until the data in the cache is consumed. Let result = rs.read(5); That’s five pieces of data consumed.

To switch a readable stream to Paused mode, you can do the following:

  • If no pipe target exists (pipe destination), can be calledrs.pause()Method implementation.
  • If there is a pipeline target, you can cancel itdataEvent listens and callsrs.unpipe()Method to remove all pipe targets.

2-2. Implement a readable stream

In the source code, fs.createreadStream () is an instance of ReadStream, which inherits the Stream.readable interface. Excerpt part of the source code, easy for us to understand.

const { Readable, Writable } = require('stream');

function ReadStream() { }

util.inherits(ReadStream, Readable);

fs.createReadStream = function (path, options) {
    return new ReadStream(path, options);
};
Copy the code

Knowing the relationship between these classes, let’s start implementing our own ReadStream class.

The 2-2-1,flowingImplementation of patterns

In this mode, what we need to do is, when the readable stream listens for the data event, it starts reading the data, and it keeps firing the data event and sending the data back. Let’s draw the skeleton of the ReadStream class, as follows

let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
    constructor(path, options) {
        this.path = path;
        this.flowing = false; . } read() { } open() { } end() { } destroy() { } pipe() { } pause() { } resume() { } }Copy the code

There are many parameters attached to this, which are listed separately:

attribute role
path Record the path to the file to read
fd File descriptor
flowing The signature of flowing mode
encoding coding
flag File operation permission
mode File mode, default is 0O666
start Start reading position, default is 0
pos Current reading position
end End read position
highWaterMark Maximum water level, 64 x 1024 by default
buffer Data storage area
autoClose Automatically shut down
length Length of data storage area

The constructor should also have these sections:

 this.on('newListener', (type, listener) => {
            if (type === 'data') {
                this.flowing = true;
                this.read(); }});this.on('end', () = > {if (this.autoClose) {
                this.destroy();; }});this.open();
Copy the code

Focusing on the first listening event, which implements, as soon as the user listens for the data event, we start calling this.read(), which reads the data.

Next, we write the main read() method, which reads data and emits data events. It relies on one method, fs.read(), for those unfamiliar, click here

 read() {
        // What happens when the file descriptor does not return
        if (typeof this.fd ! = ='number') {
            return this.once('open', () = >this.read())
        }
        // Handle boundary values
        let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
        // Start reading data
        fs.read(this.fd, this.buffer, 0, n, this.pos, (err, bytesRead) => {
            if (err) return;
            if (bytesRead) {
                let data = this.buffer.slice(0, bytesRead);
                data = this.encoding ? data.toString(this.encoding) : data;
                // Emit an event that returns the read data.
                this.emit('data', data);
                this.pos += bytesRead;
                if (this.end && this.pos > this.end) {
                    return this.emit('end');
                }
                // Under flowing mode, data was read incessantly
                if (this.flowing) {
                    this.read(); }}else {
                this.emit('end'); }})}Copy the code

Implement the open method, which gets the file descriptor. Relatively simple

open() {
        // Open the file
        fs.open(this.path, this.flag, this.mode, (err, fd) => {
            // If the file fails to open, an error event is emitted
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err); }}// Get the file descriptor
            this.fd = fd;
            this.emit('open', fd); })}Copy the code

Implement the PIPE method, which looks like this:

  • Listening to thedataEvent, get the data
  • Writes data to a writable stream and pauses writing when the cache is full. If not, write is resumed
  • After writing data, triggerendThe event
pipe(des) {
        // Listen to the data event and get the data
        this.on('data', (data) => {
            // If flag is true, the cache is not full and data can be written.
            let flag = des.write(data);
            if(! flag) {this.pause(); }});The drain event indicates that all data has been written to the cache and data can be read again
        des.on('drain', () = > {this.resume();
        });
        this.on('end', () => { des.end(); })}Copy the code

Other methods are simpler to implement.

end() {
    if (this.autoClose) {
        this.destroy();
    }
}
destroy() {
    fs.close(this.fd, () => {
        this.emit('close');
    })
}
pause() {
    this.flowing = fasle;
}
resume() {
    this.flowing = true;
    this.read();
}
Copy the code

At this point, a flowing mode readable stream was implemented.

The 2-2-2,pausedImplementation of patterns

The difference between a Stephing-mode readable stream and a flowing one was that, when the flow was in Paused mode, the underlying interface would not read the data and return it in one go; it would fill the cache and then stop reading, only reading again when the cache was empty or below the favored water threshold

Paused mode, where we focus on the implementation of the read() method, instead of reading the data as quickly as possible and sending it back to the consumer by triggering a data event. Instead, when the user listens for readable events, we fill the cache and then no more data is read. Until the data in the cache is consumed and less than the highWaterMark, the cache is filled with data, and so on until all data is read. This mode enables the process of reading data to be controlled and read on demand.

Let’s see how the read() method is implemented.

read(n){
    let ret;
    // Boundary value detection
    if (n > 0 && n < this.length) {
        // Create buffer, the value returned by the read method
        ret = Buffer.alloc(n);
        let b;
        let index = 0;
        while (null! = (b =this.buffers.shift())) {
            for (let i = 0; i < b.length; i++) {
                // Retrieve the data to consume
                ret[index++] = b[i];
                if (index === ret.length) {
                    this.length -= n;
                    b = b.slice(i + 1);
                    // Put unconsumed data back into the cache
                    this.buffers.unshift(b);
                    break; }}}// Handle coding issues
        if (this.encoding) {
            ret = ret.toString(this.encoding); }}// When the cache size is smaller than the highWaterMark, read the data and fill the cache
    if (this.length === 0| | -this.length < this.highWaterMark)) {
        _read(0);
    }
    return ret;
}
Copy the code

Here, I’ve posted the main code for you to look at, just to throw some light on it. The read() method mainly operates on the cache, while the _read() method actually reads data from the file. Take a look at the _read() method.

let _read = (a)= > {
            let m = this.end ? Math.min(this.end - this.pos + 1.this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) return
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }
                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false; }}}else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end'); }})}Copy the code

At this point, the readable stream pattern of Paused mode is complete.

3. Writable streams

An object that implements the stream.Writable interface to write stream data to the object. Writable streams are simpler than readable streams. The main writable streams are write(), _write(), and clearBuffer().

3-1. Implement a writable stream

Implementation of the write() method

 write(chunk, encoding, cb) {
        // Determine the parameters
        if (typeof encoding === 'function') {
            cb = encoding;
            encoding = null;
        }
        // Process incoming data
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
        let len = chunk.length;
        this.length += len;
        let ret = this.length < this.highWaterMark;
        // Add a new task to the task queue while data is being written
        if (this.writing) {
            this.buffers.push({
                chunk,
                encoding,
                cb
            })
            // Write data
        } else {
            this.writing = true;
            this._write(chunk, encoding, this.clearBuffer.bind(this));
        }
        return ret;
    }
Copy the code

Implementation of the _write() method

The main function of the _write() method is to call the underlying API to write data to a file

 _write(chunk, encoding, cb) {
        // The file descriptor is not available
        if (typeof this.fd ! = ='number') {
            return this.once('open', () = >this._write(chunk, encoding, cb));
        }
        // Write data, execute callback function
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.length -= written;
            // Update the variable from which to write data next after it is written
            this.pos += written;
            // Execute the callback functioncb && cb(); })}Copy the code

The implementation of the clearBuffer() method

 clearBuffer(cb) {
        // Retrieve a task from the task queue
        let data = this.buffers.shift();
        // If the task has a value, the data is written to the file
        if (data) {
            this._write(data.chunk, data.encoding, this.clearBuffer.bind(this));
        } else {
            this.writing = false;
            this.emit('drain'); }}Copy the code

At this point, a writable stream is implemented.

4. Duplex flow

Duplex is a stream that implements both Readable and Writable interfaces. With duplex flow, we can implement both readable and writable interfaces on the same object, as if inheriting both interfaces. Importantly, the readability and writability operations of a duplex stream are completely independent of each other. This is simply combining two features into one object.

const {Duplex} = require('stream');
const inoutStream = new Duplex({
    // Implement a write method
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    // Implement a read method
    read(size) {
        this.push((++this.index)+' ');
        if (this.index > 3) {
            this.push(null); }}});Copy the code

5. Transform the flow

For the transformation flow, we don’t have to implement a read or write method, we just need to implement a transform method that combines the two. It stands for write method, and we can also use it to push data.

const {Transform} = require('stream');

const upperCase = new Transform({
	// Implement a transform method
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase()); callback(); }}); process.stdin.pipe(upperCase).pipe(process.stdout);Copy the code