What is flow?

What are the common application scenarios for streams?

What is the implementation mechanism for streams?

What is flow?

What is flow? I drew a simplified sewage treatment process to show my understanding of convection (forgive my poor imagination and drawing skills).

This is my understanding of how GULp works when I first used it. The sewage comes in through the inlet, and through a series of treatments in the sewage processor, the water eventually flows out of the outlet.

Going back to the code, using gulp, I type in a less file, gulp uses less, auto-completion, compression, etc., to output the desired CSS file. Do you have a one-stop service?

In the course of learning the flow, I also learned about an interesting model — the producer/consumer model. Once you understand the model, there is little pressure to understand the flow.

Take an example from life to help us understand the producer/consumer model.

The producer, like the factory in life, constantly generates instant noodles.

Consumers, like the masses of the people, need to buy instant noodles to continue their lives.

If I go directly to the factory to buy instant noodles, although it will be very cheap, but the retail will lose money, so they will only wholesale to me, which is enough to wipe out my small Treasury.

As for the factory, if it faces the buyers directly, the factory will produce instant noodles for one day and stop production for a month, because it needs to sell instant noodles for the next month. Soon, the factory will go bankrupt.

Since the factory cannot connect with consumers directly, a third party — supermarket can be introduced as the agent of the factory. The factory can then focus on producing instant noodles, which are sold directly to supermarkets. And consumers do not have to go to the factory wholesale, although a little more expensive, but free ah. Want to eat, go to the supermarket to buy a bag.

Through the above two reality chestnut, macro understanding of a stream.

Three roles: producer, consumer and third party intermediary

The third party intermediary at the bottom dotted line is the same as the third party intermediary at the top. The reason for the separation is to highlight the decoupling of consumers and producers through third-party intermediaries.

Some application scenarios for streams

Node.js has four basic stream types:

  • Readable – Readable stream
  • Writable – Writable stream
  • Duplex — A flow that can be read or written. Also known as a Duplex flow
  • Transform — A special Duplex stream that modifies and transforms data during reading and writing

Many of the built-in/core modules in Node.js are implemented based on streams:

This figure shows the status of streams in Node.js. It can be said that understanding flows will greatly help us understand and use the built-in modules listed above.

The implementation mechanism for streams

The best way to understand the internals of streams is to look at the source code for Node.js as well as the official node.js documentation:

  • _stream_readable.js
  • _stream_writable.js
  • _stream_duplex.js
  • _stream_transform.js

As we can see from the source code, these four basic types of streams are all abstractions of streams for developers to extend, so the source code seems to have some basis in use. Next, I will take a different approach to implementing the main logic of FS Readable Stream without relying on these four basic types of streams to gain a better understanding of convection

fs Readable Stream

Readable Stream has two modes:

  • Flowing: In this mode, data is fetched and exported as quickly as possible. So if there is no event listener and no pipe() to direct the flow of data, data can be lost.
  • Paused: default mode. In this mode, the call needs to be made manuallystream.read(..)To get the data.

Switching to flowing mode can be done in the following ways:

  • add'data'Event listener
  • callstream.resume(..)methods
  • callstream.pipe()Method to send data to the consumerWritable

You can switch to Paused mode in the following ways:

  • If not calledstream.pipe(..)The callstream.pause(..)Can be
  • If there is a callstream.pipe(..), then you need to passstream.unpipe(..)Remove all pipes

The following points need special attention:

  1. Only provide consumers to consume data, for example, add'data'Event listeners, the readable stream will produce data.
  2. If you remove'data'Event listeners will not automatically stop the stream.
  3. If it’s calledstream.pipe(..)Call again,stream.pause(), will not stop the stream.
  4. If the readable stream is switched to stream mode, but is not added'data'Event listeners, then the data will be lost. For example, callstream.resume(), but did not add'data'Event listeners, or'data'The event listener is removed.
  5. Choose a way to consume the data produced by the readable stream. Is recommendedstream.pipe(..). Also can use more controllable event mechanism, and then cooperatereadable.pause()/readable.resume()APIs
  6. ifreadableanddataIs used at the same time, soreadableThe priority of events is greater thandataEvent is high. At this point, it must bereadableDisplay the call within the eventstream.read(..)In order to read the data.

See stream_readable_streams for more API usage

Flowing mode

Let’s try to implement the file-readable flow mode and take a look at its internal mechanism.

/ / source see https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');

// Use the node.js internal tools module to make the file readable stream inherit many of the event methods of the event
// Streams are implemented based on event mechanisms
util.inherits(FsReadableStream, EventEmitter);

// Declare a file-readable stream constructor and initialize the parameters
function FsReadableStream(path, options) {
    const self = this; // Prevent the this pointer from pointing out improperly

    // In order to illustrate the implementation process, we omit the boundary of the parameter.
    self.path = path;

    // The parameter to open the file
    / / see https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
    self.flags = options.flags || 'r';
    self.mode = options.mode || 0o66;

    // Start and end positions for reading file contents
    self.start = options.start || 0;
    self.end = options.end;
    // The water mark of each read, i.e. one read, maximum cache
    self.highWaterMark = options.highWaterMark || 64 * 1024;
    // Whether to automatically close the file after reading the content
    self.autoClose = options.autoClose === undefined ? true : options.autoClose;
    // Which encoding will be used to decode the final output data
    self.encoding = options.encoding || 'utf8';

    // A valid descriptor for the file
    self.fd = null;

    // Real time start position for file reading. The first time it starts at 0, and the second time it starts at 0 + the length of the first read
    self.pos = self.start;

    // Apply water level space as buffer buffer
    self.buffer = Buffer.alloc(self.highWaterMark);

    // The new constructor opens the file in preparation for the next step
    self.open();

    // Mode starts with pause mode
    self.flowing = null;

    // Once a new event is listened on, and is the 'data' event,
    // Switch mode to flow mode and read data
    self.on('newListener'.function (eventName) {
        if (eventName === 'data') {
            self.flowing = true; self.read(); }}); }// Before operating on a file, you need to open the file to get a valid descriptor for the file
FsReadableStream.prototype.open = function () {
    const self = this;
    fs.open(self.path, self.flags, self.mode, function (err, fd) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        self.fd = fd;
        self.emit('open', fd);
    });
};

// Read the contents of the file
FsReadableStream.prototype.read = function () {
    const self = this;
    // self.open() is an asynchronous method, in which case the file is checked to see if it is open
    if (typeofself.fd ! = ='number') {
        // The file is not open, you can add the open event listener
        self.once('open', self.read);
        return;
    }
    // Calculate how much data needs to be read each time.
    const howMuchToRead = self.end ? Math.min(self.highWaterMark, self.end - self.pos + 1) : self.highWaterMark;
    fs.read(self.fd, self.buffer, 0, howMuchToRead, self.pos, function (err, bytesRead) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        if (bytesRead > 0) {
            // Update the read location
            self.pos = self.pos + bytesRead;
            // It is possible to read something smaller than the size of the buffer, so it must be intercepted to prevent garbled characters
            const data = self.encoding ? self.buffer.slice(0, bytesRead).toString(self.encoding) : self.buffer.slice(0, bytesRead);
            self.emit('data', data);
            // If the start position of the next read is larger than the end position, the reading is complete
            if (self.pos > self.end) {
                self.emit('end');
                if(self.autoClose) { self.destroy(); }}// If it is still in flow mode, data will continue to be read
            if(self.flowing) { self.read(); }}else {
            // The contents of the file have been read
            self.emit('end');
            if(self.autoClose) { self.destroy(); }}}); };// Change flow mode to pause mode
FsReadableStream.prototype.pause = function () {
    if (this.flowing ! = =false) {
        this.flowing = false; }};// Change pause mode to flow mode
FsReadableStream.prototype.resume = function () {
    // If resume is called directly, no data listener is added
    // Data will be lost
    if (!this.flowing) {
        this.flowing = true;
        this.read(); }};// Close the file
FsReadableStream.prototype.destroy = function () {
    const self = this;
    if (typeof self.fd === 'number') {
        fs.close(self.fd, function (err) {
            if (err) {
                self.emit('error', err);
                return;
            }
            self.fd = null;
            self.emit('close');
        });
        return;
    }
    this.emit('close');
};
Copy the code

We’ll see that the flow pattern will generate data in a continuous stream until the data source is exhausted. Of course, you can also use stream.pause(..) /stream. Resume The control of this mode lies with the developer, who must be familiar with the operation mechanism of this mode and use it carefully, otherwise it is easy to be overwhelmed by consumers or lose data halfway.

Paused mode

Because flow mode and pause mode are mutually exclusive, the two modes are used to implement readable streams separately. In pause mode, we need to listen for another event — ‘readable’ — and display a call to stream.read(n) to read the data.

/ / source see https://github.com/nodejs/node/blob/master/lib/fs.js
// Pause mode for file-readable streams
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');

// Use the node.js internal tools module to make the file readable stream inherit many of the event methods of the event
// Streams are implemented based on event mechanisms
util.inherits(FsReadableStream, EventEmitter);

// Declare a file-readable stream constructor and initialize the parameters
function FsReadableStream(path, options) {
    const self = this; // Prevent the this pointer from pointing out improperly

    // In order to illustrate the implementation process, we omit the boundary of the parameter.
    self.path = path;

    // The parameter to open the file
    / / see https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
    self.flags = options.flags || 'r';
    self.mode = options.mode || 0o66;

    // Start and end positions for reading file contents
    self.start = options.start || 0;
    self.end = options.end;
    // The water mark of each read, i.e. one read, maximum cache
    self.highWaterMark = options.highWaterMark || 64 * 1024;
    // Whether to automatically close the file after reading the content
    self.autoClose = options.autoClose === undefined ? true : options.autoClose;
    // Which encoding will be used to decode the final output data
    self.encoding = options.encoding || 'utf8';

    // A valid descriptor for the file
    self.fd = null;

    // Real time start position for file reading. The first time it starts at 0, and the second time it starts at 0 + the length of the first read
    self.pos = self.start;

    [Buffer, Buffer, Buffer...]
    self.buffers = [];
    // The current cache length self.buffers. Length can only read how many elements there are in the array.
    // Here is the sum of the length of each item of cache buffers
    self.length = 0;

    // Since reading files is asynchronous, a flag is required
    // If the data is being read, the data is stored in the cache
    this.reading = false;
    // Whether the condition for sending 'readable' events is met
    // The 'readable' event indicates that the stream has a new dynamic: either there is new data, or it is at the end of the stream.
    // For the former, stream.read() returns usable data. For the latter, stream.read() returns null
    this.emittedReadable = false;

    // The new constructor opens the file in preparation for the next step
    self.open();

    // Once a new event is listened for, and is 'readable',
    / / open emittedReadable
    self.on('newListener'.function (eventName) {
        if (eventName === 'readable') { self.read(); }}); } FsReadableStream.prototype.read =function (n) {
    const self = this;
    let buffer = null;
    // When the requested data length is greater than the cache size
    if (n > self.length) {
        // The size of the cache is too large
        // The water level will be raised to accommodate this demand
        // computeNewHighWaterMark is a method of raising the watermark in node source code
        self.highWaterMark = computeNewHighWaterMark(n);
        self.emitReadable = true;
        self._read();
    }
    // When the requested data length is greater than 0 and less than or equal to the length of the cache
    if (n > 0 && n <= self.length) {
        // Apply Buffer memory first
        buffer = Buffer.alloc(n);
        let index = 0; // The number of cycles
        let flag = true; // Control the tag of while
        let b;
        while (flag && (b = self.buffers.shift())) {
            for (let i = 0; i < b.length; i++) {
                buffer[index++] = b[i]; / / assignment
                if (n === index) {
                    let arr = b.slice(index);
                    if (arr.length) {
                        // Don't put it back into the cache
                        self.buffers.unshift(arr);
                    }
                    self.length = self.length - n;
                    flag = false; }}}}// If there is no data in the current cache
    if (self.length === 0) {
        self.emittedReadable = true;
    }
    // When the length of the cache is below the water mark, the data is generated and stored in the cache
    if (self.length < self.highWaterMark) {
        if(! self.reading) { self.reading =true; self._read(); }}// Return the read data
    return buffer;
};

FsReadableStream.prototype._read = function () {
    const self = this;
    if (typeofself.fd ! = ='number') {
        return self.once('open', self._read);
    }
    const buffer = Buffer.alloc(self.highWaterMark);
    fs.read(self.fd, buffer, 0, self.highWaterMark, self.pos, function (err, bytesRead) {
        if (bytesRead > 0) {
            // The read content is placed in the cache by default
            self.buffers.push(buffer.slice(0, bytesRead));
            self.pos = self.pos + bytesRead; // Maintain the read index
            self.length = self.length + bytesRead; // Maintain the size of the cache
            self.reading = false; // The reading is complete
            // Whether a readable event needs to be emitted
            if (self.emittedReadable) {
                self.emittedReadable = false; // The default is not triggered next time
                self.emit('readable'); }}else {
            self.emit('end');
            if(self.autoClose) { self.destroy(); }}}); };// Before operating on a file, you need to open the file to get a valid descriptor for the file
FsReadableStream.prototype.open = function () {
    const self = this;
    fs.open(self.path, self.flags, self.mode, function (err, fd) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        self.fd = fd;
        self.emit('open', fd);
    });
};

// Close the file
FsReadableStream.prototype.destroy = function () {
    const self = this;
    if (typeof self.fd === 'number') {
        fs.close(self.fd, function (err) {
            if (err) {
                self.emit('error', err);
                return;
            }
            self.fd = null;
            self.emit('close');
        });
        return;
    }
    this.emit('close');
};
Copy the code

In this mode, we will see that ‘readable’ events will tell us when we can retrieve the data. If you call stream.read(n) directly, it will cause fs.read(..) Asynchronous operation, data has not been read and put into cache, resulting in null result.

As long as the contents of the cache are consumed below the water mark, the cup will be automatically refilled and the data of the water mark size will be generated and put into the cache.

When will the ‘readable’ event be emitted? Triggered when the cache is empty and water line size data is produced after the cache. The next time this happens, the cache will be used up and the cup will be filled again.

Stream.read (n) pass in as much data as you want. Stream. length can be used to view the length of the current cache data and determine how much data to request. In actual scenarios, use stream._readablestate.length

fs Writable Stream

Fs Writable Stream has a similar mechanism to readable streams. Without further ado, first the code:

/ / source see https://github.com/nodejs/node/blob/master/lib/fs.js
const fs = require('fs');
const EventEmitter = require('events');
const util = require('util');

// Use the node.js internal tools module to make the file writable stream inherit many of the event methods of the event
// Streams are implemented based on event mechanisms
util.inherits(FsWritableStream, EventEmitter);

// Declare a file-writable stream constructor and initialize the parameters
function FsWritableStream(path, options) {
    const self = this; // Prevent the this pointer from pointing out improperly

    // In order to illustrate the implementation process, we omit the boundary of the parameter.
    self.path = path;

    // The parameter to open the file
    / / see https://nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback
    self.flags = options.flags || 'r';
    self.mode = options.mode || 0o66;

    // Where the file content is written to start
    self.start = options.start || 0;
    // The maximum cache for each write
    self.highWaterMark = options.highWaterMark || 64 * 1024;
    // Whether to automatically close the file after the content is written
    self.autoClose = options.autoClose === undefined ? true : options.autoClose;
    // Tells the program how the written data will be decoded
    self.encoding = options.encoding || 'utf8';

    // A valid descriptor for the file
    self.fd = null;

    // Real time start position of file write. The first one starts at 0, and the second one is 0 + the length of the first one
    self.pos = self.start;

    [Buffer, Buffer, Buffer...]
    self.buffers = [];
    // The current cache length self.buffers. Length can only read how many elements there are in the array.
    // Here is the sum of the length of each item of cache buffers
    self.length = 0;

    // Since reading files is asynchronous, a flag is required
    // If the data is being read, the data is stored in the cache
    this.writing = false;

    // controls whether to notify the 'drain' event listener that the cache is empty from the full state
    self.needDrain = false;

    // The new constructor opens the file in preparation for the next step
    self.open();
}

FsWriteStream.prototype.open = function () {
    const self = this;
    fs.open(self.path, self.flags, self.mode, function (err, fd) {
        if (err) {
            self.emit('error', err);
            if (self.autoClose) {
                self.destroy();
            }
            return;
        }
        self.fd = fd;
        self.emit('open', fd);
    });
};

FsWriteStream.prototype.destroy = function () {
    const self = this;
    if (typeof self.fd === 'number') {
        fs.close(self.fd, function (err) {
            if (err) {
                self.emit('error', err);
                return;
            }
            self.emit('close');
        });
    } else {
        self.emit('close'); }};// Initiate the call. By default, all three parameters are passed
FsWriteStream.prototype.write = function (chunk, encoding, callback) {
    const self = this;
    const bufferChunk = Buffer.isBuffer(chunk) && chunk || Buffer.from(chunk, encoding);
    self.length = self.length + bufferChunk.length;
    // If the current cache length is less than the watermark, the cache is not full
    const ret = self.length < self.highWaterMark;
    // When the cache is full, switch on notification to the drain eventself.needDrain = ! ret;if (self.writing) {
        // Data is being written/consumed, so it is stored in cache first
        self.buffers.push({
            chunk,
            encoding,
            callback,
        });
    } else {
        // This is equivalent to sending ajax
        self.writing = true;
        self._write(chunk, encoding, function () {
            callback();
            // Clear the cache when the write is complete
            self.clearBuffer();
        });
    }
    // Each time write is called, it returns whether the current cache is full
    return ret;
};

FsWriteStream.prototype._write = function (chunk, encoding, callback) {
    const self = this;
    if (typeofself.fd ! = ='number') {
        self.once('open'.function () {
            self._write(chunk, encoding, callback);
        });
        return;
    }
    fs.write(self.fd, chunk, 0, chunk.length, self.pos, function (err, writtenBytes) {
        if (err) {
            self.emit('error', err);
            self.writing = null;
            if (self.autoClose) {
                self.destroy();
            }
            return false;
        }
        // Length subtracts the length of data that has been successfully consumed
        self.length = self.length - writtenBytes;
        // Update the starting position of the next write
        self.pos = self.pos + writtenBytes;
        callback();
    });
};

FsWriteStream.prototype.clearBuffer = function () {
    const self = this;
    const buffer = self.buffers.shift();
    if (buffer) {
        // If there is still data in the cache, continue to consume data
        self._write(buffer.chunk, buffer.encoding, function () {
            buffer.callback();
            self.clearBuffer();
        });
    } else {
        // If the cache is empty, reset the write state
        self.writing = false;
        if (self.needDrain) {
            // Send the drain event to indicate that the cache is exhausted and ready for the next wave of data writes
            self.needDrain = false;
            self.emit('drain'); }}};Copy the code

We will see that when the data stream comes in, the writable stream will directly consume the data. When consuming/writing files is too slow, the data flow is cached in the cache.

“Back pressure” (fs.write(..)) occurs when producers send data too fast and fill up the cache. When the cache is exhausted, the writable stream sends a drain message to the producer to resume production.

conclusion

The above FS Readable Stream and FS Writable Stream are the upper apis for the base Stream types, Readable Stream and Writable Stream, respectively. Fs source code, in fact, is respectively inherited these two basic stream types plus some FS file operations, finally extended into a file stream.

So flow is a producer/consumer model based on events and state machines.

For more information on the use of streams, see the official website

reference

  • Have an in-depth understanding of the internal mechanics of Node.js Stream
  • Node.js streams: Everything you need to know
  • The Node of the stream
  • Node. Js’s official website
  • Node. Js source code