The Stream module is one of the core modules in Node. Other modules such as FS and HTTP are based on instances of the Stream module.

For most of the front-end white in the learning process of entry Node, the concept of flow and use is not very clear understanding, because there seems to be few front-end work on “flow” processing related applications.

1. What is flow?

Simply using the word “flow”, we can easily generate the concept of water flow, flow and so on.

Official definition: stream, which is an abstract interface used to process streaming data in Node.js

From the official definition, we can see:

  • A stream is a data processing tool provided by Node
  • A stream is an abstract interface in Node

A stream, to be precise, is a data stream, which is a means of transmitting data. In an application, a stream is an ordered data stream that has a beginning and an end.

The main reason why we don’t understand stream well is that it is an abstract concept.

2. Specific application scenarios of flow

In order to get a clear understanding of the STREAM module, we will first illustrate the practical application of the Stream module with specific application scenarios.

Stream is mainly used in Node for massive data processing, such as fs reading and writing large files, HTTP request response, file compression, data encryption/decryption and other applications.

We illustrate the use of flow in the picture above, the bucket can be understood asThe data sourceThe pool can be interpreted asData to the target, the pipe connected in the middle, we can understand asThe data flowThrough theData flow pipelineData flows from the data source to the data target.

3. Traffic classification

In Node, streams are divided into four classes: readable, writable, duplex, and transition.

  • Writable: a stream that can write data
  • Readable: A stream from which data can be read
  • DuplexReadable 和 WritableThe flow of
  • Transform: Data that can be modified or converted while being written and readDuplex 流

All streams are instances of EventEmitter. That is, we can listen for changes in the data flow through the event mechanism.

4. Data schema and cache area

Before we dive into the specific use of class 4 streams, we need to understand two conceptual data schemas and caches that will help us better understand the flow as we move forward.

4.1 Data Mode

All streams created by the Node.js API operate only on strings and Buffer (or Uint8Array) objects.

4.2 buffer

Both Writable and Readable streams store data in internal buffers.

The amount of buffered data depends on the highWaterMark option passed to the stream’s constructor, which specifies the total number of bytes for normal streams; For streams operating in object mode, the highWaterMark option specifies the total number of objects.

The highWaterMark option is a threshold, not a limit: it specifies the amount of data buffered by the stream before it stops requesting more data.

When the implementation calls stream.push(chunk), the data is cached in the Readable stream. If the consumer of the stream does not call stream.read(), the data stays in the internal queue until it is consumed.

Once the total size of the internal read buffer reaches the threshold specified by highWaterMark, the flow temporarily stops reading data from the underlying resource until it can consume the currently buffered data

When the writable.write(chunk) method is repeatedly called, data is cached in the Writable stream.

5. A readable stream

5.1 Flow and pause of stream reads

The Readable flow works effectively in one of two modes: flow and pause.

  • Flow mode: Read data from the underlying system and push() into the cache. When the highWaterMark is reached, push() returns false, resources stop flowing to the cache, and a data event is triggered to consume data.

  • Paused mode: All Readable streams start in Paused mode, and the stream.read() method must be explicitly called to read data from the stream. A readable event will be emitted every time data reaches the cache, that is, every push() will emit readable.

  • How to switch from pause mode to Flow mode:

    • Add a data event handle
    • Call the stream.resume() method
    • Call the stream.pipe() method to send data to Writable
  • How to switch from Flow mode to pause mode:

    • If there is no pipe target, it is done by calling stream.pause().
    • If there are pipe targets, delete all pipe targets. Multiple pipe targets can be deleted by calling the stream.unpipe() method.
5.2 Common Examples of Readable Streams
import path from 'path';
import fs, { read } from 'fs';

const filePath = path.join(path.resolve(), 'files'.'text.txt');

const readable = fs.createReadStream(filePath);
// If a default encoding is specified for the stream using the readable.setencoding () method, the listener callback will pass in blocks of data as strings; Otherwise the data is passed in as a Buffer.
readable.setEncoding('utf8');
let str = ' ';

readable.on('open'.(fd) = > {
  console.log('Start reading file')})// The 'data' event is emitted whenever the stream transfers ownership of a chunk of data to the consumer
readable.on('data'.(data) = > {
  str += data;
  console.log('Read data')})// method will cause the flow in flow mode to stop and trigger the 'data' event, switching to pause mode. Any available data will remain in the internal buffer.
readable.pause();
The // method causes the readablestream that has been explicitly paused to resume firing the 'data' event, switching the stream into flowing mode.
readable.resume();
// The 'pause' event is emitted when stream.pause() is called and readableFlowing is not false.
readable.on('pause'.() = > {
  console.log('Read pause')})// The 'resume' event is triggered when stream.resume() is called and readableFlowing is not true.
readable.on('resume'.() = > {
  console.log('Re-flow')})// The 'end' event is emitted when there is no more data in the stream to consume.
readable.on('end'.() = > {
  console.log('File read finished');
})
// The 'close' event is emitted when the stream and any of its underlying resources (such as file descriptors) are closed.
readable.on('close'.() = > {
  console.log('Close file reading')})// Bind the destWritable stream to readable so that it automatically switches to flowing mode and pushes all its data to the bound Writable. The data flow will be automatically managed
readable.pipe(destWriteable)
// This can happen if the underlying stream is unable to generate data due to an underlying internal failure, or if the stream implementation tries to push an invalid data block.
readable.on('error'.(err) = > {
  console.log(err)
  console.log('File reading error occurred')})Copy the code

6. Can write flow

6.1 Flows and pauses of writable flows

Writeable streams are similar to readable streams in that data will be written directly to the cache. When the write speed is slow or the write is paused, the data will be cached in the cache.

“Back pressure” occurs when a producer writes too fast and fills the queue pool, and the producer needs to be told to pause production. When the queue is released, the Writable stream sends a drain message to the producer to resume production.

6.2 Examples of writable flows
import path from 'path';
import fs, { read } from 'fs';

const filePath = path.join(path.resolve(), 'files'.'text.txt');
const copyFile = path.join(path.resolve(), 'files'.'copy.txt');

let str = ' ';
// Create a readable stream
const readable = fs.createReadStream(filePath);
// If the default encoding is specified for the stream using the readable.setencoding () method
readable.setEncoding('utf8');

// Create a writable stream
const wirteable = fs.createWriteStream(copyFile);
/ / code
wirteable.setDefaultEncoding('utf8');

readable.on('open'.(fd) = > {
  console.log('Start reading file')})// The 'data' event is emitted whenever the stream transfers ownership of a chunk of data to the consumer
readable.on('data'.(data) = > {
  str += data;
  console.log('Read data');

  / / write
  wirteable.write(data, 'utf8');
})

wirteable.on('open'.() = > {
  console.log('Start writing data')})// If the call to stream.write(chunk) returns false, the 'drain' event is emitted when it is appropriate to continue writing data to the stream.
// That is, the production speed is faster than the write speed. After the cache is full, the production stops reading data from the underlying layer
// After the writeable cache is released, a drain event is sent for the producer to continue reading
wirteable.on('drain'.() = > {
  console.log('Continue writing')})// After the stream.end() method is called and all data is flushed to the underlying system, the 'Finish' event is emitted.
wirteable.on('finish'.() = > {
  console.log('Data written out')
})

readable.on('end'.() = > {
  // Notify writable stream when data has been read
  wirteable.end()
})
// The 'pipe' event is emitted when the stream.pipe() method is called on a readable stream to add this writable stream to its target set.
// readable.pipe(destWriteable)
wirteable.on('pipe'.() = > {
  console.log('Pipe Flow Creation')
})

wirteable.on('error'.() = > {
  console.log('Data write error occurred')})Copy the code