This article is participating in node.js advanced technology essay, click to see more details

The author recently used the pipe function frequently in the development, only knowing that it is the pipe of the stream, but not knowing how it works, so I simply started to learn from the stream, and randomly organized the knowledge and source code into an article to share with everyone.

Stream is a very basic concept in NodeJs, and many of the basic modules are implemented based on streams and play a very important role. Streaming is also a very difficult concept to understand, mainly due to the lack of relevant documentation. For NodeJs beginners, it often takes a lot of time to understand the concept. Fortunately, for most NodeJs users, it is only used to develop Web applications. Inadequate understanding of convection does not affect use. However, understanding streams gives you a better understanding of the other modules in NodeJs, and in some cases it is better to use streams to process data.

How to Understand flow

  • To the consumer of the stream, we can think of the stream as an array from which we only need to focus on getting (consuming) and writing (producing).

  • For the stream developer (who uses the Stream module to create a new instance), the focus is on how to implement some of the methods in the stream. Usually, the focus is on two things: who is the target resource and how to operate on it. Once the target resource is determined, it needs to operate on the target resource according to the different states and events of the stream

Buffer pool

All streams in NodeJs have buffer pools. The purpose of the buffer pool is to increase the efficiency of the stream. When the production and consumption of data takes time, we can store the production data in the buffer pool before the next consumption. However, the buffer pool is not always in use. For example, if the buffer pool is empty, data is not put into the buffer pool after production, but is consumed directly. .

If data is being produced faster than it is being consumed, excess data is waiting somewhere. If the production rate of data is less than the consumption rate of process data, the data will accumulate somewhere and then be consumed again. (Developers have no control over how fast data is produced or consumed, only when they can produce or consume data.)

Where the data waits, accumulates, and then happens out. That’s the buffer pool. The buffer pool is usually located in the COMPUTER’s RAM (memory).

To take a common example of a buffer, when we watch online video, if you have a fast Internet connection, the buffer will always be filled immediately and sent to the system to play, and then immediately buffer the next video. While watching, there will be no lag. If the network speed is slow, loading is displayed, indicating that the buffer is being filled. After filling is complete, data is sent to the system to view this video.

The Buffer pool of a NodeJs stream is a linked list of buffers. Each time data is added to the Buffer pool, a new Buffer node is created and inserted into the end of the list.

EventEmitter

Stream in NodeJs is an abstract interface that implements EventEmitter, so I’ll start with a brief introduction to EventEmitter.

EventEmitter (on, Once, off, emit) is a class for publishing and subtracting events.

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

// Bind a handler for eventA
eventEmitter.on('eventA'.() = > {
    console.log('eventA active 1');
});

// Bind a handler for eventB
eventEmitter.on('eventB'.() = > {
    console.log('eventB active 1');
});

eventEmitter.once('eventA'.() = > {
    console.log('eventA active 2');
});

/ / triggers eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2
Copy the code

Note that EventEmitter has events called newListener and removeListener. When you add a listener handler to an event object, Both fire newListeners (eventEmitter. Emit (‘newListener’)), and removeListener is fired when a handler is removed.

Note also that the once binding handler is executed only once, and removeListener will be fired before it executes, meaning that the once binding handler is removed before it is fired.

const { EventEmitter } = require('events')

const eventEmitter = new EventEmitter()

eventEmitter.on('newListener'.(event, listener) = >{
    console.log('newListener', event, listener)
})

eventEmitter.on('removeListener'.(event, listener) = > {
    console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]


eventEmitter.on('eventA'.() = > {
    console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]

function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]

eventEmitter.once('eventA'.() = > {
    console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]

eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2

eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]
Copy the code

But that’s not important for the rest of the story.

Stream

Stream is an abstract interface that handles streaming data in Node.js. Stream is not an actual interface, but rather a general term for all streams. The actual interfaces are ReadableStream, WritableStream, and ReadWriteStream.

interface ReadableStream extends EventEmitter {
    readable: boolean; read(size? :number) :string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extendsWritableStream>(destination: T, options? : { end? :boolean | undefined; }): T; unpipe(destination? : WritableStream):this;
    unshift(chunk: string | Uint8Array, encoding? : BufferEncoding):void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb? :(err? :Error | null) = > void) :boolean;
    write(str: string, encoding? : BufferEncoding, cb? :(err? :Error | null) = > void) :boolean; end(cb? :() = > void) :this;
    end(data: string | Uint8Array, cb? :() = > void) :this;
    end(str: string, encoding? : BufferEncoding, cb? :() = > void) :this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }
Copy the code

As you can see, both ReadableStream and WritableStream are interfaces that inherit from EventEmitter classes (ts interfaces can inherit from classes because they’re just merging types).

The corresponding implementation classes for these interfaces are Readable, Writable, and Duplex, respectively

There are four types of flows in NodeJs:

  • ReadableStream (implement ReadableStream)
  • Writable (implement WritableStream)
  • Duplex Readable and writable streams (implement WritableStream after inheriting Readable)
  • Transform Flow (inheriting Duplex)

Back pressure problem

We imagine that there is a “pipeline” between the memory and the disk. In the “pipeline” is “flow”. The data flowing into the pipeline is very fast.

The solution to NodeJs Stream is to set a buoy value for each Stream’s buffer pool. When the amount of data reaches this buoy value, pushing data to the buffer pool will return false, indicating that the current Stream has reached the buoy value and no more data is expected to be written. In this case, we should stop data production immediately to prevent the cache pool from becoming too large and causing back pressure.

Readable

Readable is a type of stream that has two modes and three states

Two read modes:

  1. Flow mode: Data is read from the underlying system and written to the buffer. When the buffer is full, the data is automatically transferred to the registered event handler via EventEmitter as soon as possible
  2. Pause mode: In this mode EventEmitter will not be actively triggered to transmit data and must be displayedReadable.read()Method to read data from a buffer. Read triggers a response to an EventEmitter event.

Three states:

  1. ReadableFlowing === NULL (initial state)
  2. ReadableFlowing === False (Pause mode)
  3. ReadableFlowing === true (Flowing mode)

The original stream’s readable.readableFlowing is NULL

Changes to true after adding the data event. Calling Pause (), unpipe(), or receiving back pressure, or adding a readable event, readableFlowing is set to false, and at that state, Binding listeners to data events does not cause readableFlowing to switch to True.

Calling Resume () causes the readable stream’s readableFlowing to switch to True

Removing all of the Readable events was the only way to make readableFlowing null.

The event name instructions
readable Triggered when there is new readable data in the buffer (triggered by every node that wants to cache pool inserts)
data It will be triggered after each consumption data, and the parameter is the data of this consumption
close Triggered when the stream is closed
error Fired when an error occurs to the stream
The method name instructions
read(size) Consume size data. If null is returned, it indicates that the current data is less than size. Otherwise, the consumed data is returned. When size is not passed, all data in the cache pool is consumed
const fs = require('fs');

const readStreams = fs.createReadStream('./EventEmitter.js', {
    highWaterMark: 100// Cache pool buoy value
})

readStreams.on('readable'.() = > {
    console.log('Buffer is full')
    readStreams.read()// Consume all the data in the cache pool, return the result and fire the data event
})


readStreams.on('data'.(data) = > {
    console.log('data')})Copy the code

Github1s.com/nodejs/node…

A readable event is emitted when size is 0.

When the length of data in the cache pool reaches the highWaterMark value, the data is not actively requested for production, but is produced after the data is consumed

If a suspended stream does not call Read to consume data, data and readable will not be emitted later. When read consumption is called, it will determine whether the length of data remaining after consumption is lower than the buoy value. If it is lower than the buoy value, production data will be requested before consumption. This means that the new data will most likely have been produced after the read logic has been executed, and readable will then be triggered again. This mechanism of producing the next consumed data ahead of time and storing it in the cache pool is why the cache flow is fast

There are two cases of flow in flow state

  • When the rate of production is slower than that of consumption: In this case, there will be no data left in the cache pool after each production. You can simply pass the production data to the data event (because it is not in the cache pool, so there is no need to call read to consume the data), and then start producing new data immediately after the last consumption. Trigger data again, one to the end of the stream.
  • When the production rate is faster than the consumption rate: There is still unconsumed data in the cache pool after each data is produced. In this case, the next data is produced when the data is consumed. After the old data is consumed, the new data is produced and added to the cache pool

The only difference is whether there is still data in the cache pool after the data is produced. If there is data, the data will be pushed to the cache pool for consumption. If there is no data, the data will be directly handed to data without being added to the cache pool.

Note that when a stream of data in a cache pool enters flow mode from paused mode, a loop call to read is made to consume the data only until null is returned

Suspend mode

In pause mode, when a readable stream is created in pause mode, the _read method is automatically called to push data from the data source to the buffer pool until the buffer pool reaches the buoy value. Each time the data reaches the buoy value, the readable stream will trigger a “readable” event, telling the consumer that the data is ready for consumption.

In general, the ‘readable’ event indicates that the stream has a new dynamic: either there is new data, or the end of the stream has been reached. Therefore, a ‘readable’ event will also be emitted before the data from the data source is read;

The handler of the consumer “readable” event actively consumes data from the buffer pool through stream.read(size).

const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300.// The read method of the argument is used as the _read method of the stream to fetch the source data
    read(size) {
        // Suppose there are 1000 ones in our source data
        let chunk = null
        // The process of reading data is usually asynchronous, such as IO operations
        setTimeout(() = > {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)}})Readable will be emitted each time data is successfully pushed to the cache pool
myReadable.on('readable'.() = > {
    const chunk = myReadable.read()// Consume all data in the current cache pool
    console.log(chunk.toString())
})
Copy the code

Note that if the size of read(size) is greater than the buoy value, the new buoy value is recalculated and the new buoy value is the next quadratic power of size (size <= 2^n, n is the smallest).

// HWM cannot be larger than 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    / / 1 gb limit
    n = MAX_HWM;
  } else {
    // Remove a maximum power of 2 to prevent excessive HWM increase
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}
Copy the code

Flow pattern

All readable streams start in pause mode and can be switched to flow mode by:

  • Add”data“Event handle;
  • Call”resume“Method;
  • The use of”pipeThe “method sends data to a writable stream

In flow mode, the data in the buffer pool is automatically output to the consumer for consumption. At the same time, after each output, the _read method is automatically called, and the data from the data source is put into the buffer pool. If there is no data in the buffer pool, the data is directly passed to the data event, without passing through the buffer pool. Until flow mode switches to another pause mode, or data is read from the data source (push(NULL));

A readable stream can be switched back to pause mode by:

  • Is called if there is no pipe targetstream.pause()
  • If there are pipe targets, remove all pipe targets. callstream.unpipe()Multiple pipeline targets can be removed.
const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300.read(size) {
        let chunk = null
        setTimeout(() = > {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})

myReadable.on('data'.data= > {
    console.log(data.toString())
})
Copy the code

Writable

Writable streams are simpler than readable streams.

When a producer calls write(chunk), it internally selects whether to cache the data in the buffer queue or calls _write based on some state (corked, writing, etc.). After each write, it tries to empty the cache queue. If the data size in the buffer queue exceeds the highWaterMark value, the consumer calls write(chunk) and returns false, at which point the producer should stop writing.

So when can I continue writing? After all data in the buffer has been successfully _written, the drain event is emitted after the buffer queue has been emptied and the producer can continue writing data.

When a producer needs to end writing data, it calls the stream.end method to notify the writable stream that it has ended.

const { Writable, Duplex } = require('stream')
let fileContent = ' '
const myWritable = new Writable({
    highWaterMark: 10.write(chunk, encoding, callback) {// as the _write method
        setTimeout(() = >{
            fileContent += chunk
            callback()// called after the write is finished
        }, 500)
    }
})

myWritable.on('close'.() = >{
    console.log('close', fileContent)
})
myWritable.write('123123')// true
myWritable.write('123123')// false
myWritable.end()
Copy the code

Note that data in the buffer pool to buoy, buffer pool at this time there may be multiple nodes, in the process of empty buffer pool (cycle call _read), will not be as far as possible to a readable stream as a consumer length to buoy the value of the data, but each time a buffer node, even if the length of the buffer in the float values are not consistent

const { Writable } = require('stream')


let fileContent = ' '
const myWritable = new Writable({
    highWaterMark: 10.write(chunk, encoding, callback) {
        setTimeout(() = >{
            fileContent += chunk
            console.log('consumer', chunk.toString())
            callback()// called after the write is finished
        }, 100)
    }
})

myWritable.on('close'.() = >{
    console.log('close', fileContent)
})

let count = 0
function productionData(){
    let flag = true
    while (count <= 20 && flag){
        flag = myWritable.write(count.toString())
        count++
    }
    if(count > 20){
        myWritable.end()
    }
}
productionData()
myWritable.on('drain', productionData)
Copy the code

The above is a writable stream with a buoy value of 10. The data source is now a string of 0 — 20 to consecutive digits. ProductionData is used to write data.

  1. First callmyWritable.write("0")Because there is no data in the cache pool"0"Instead of entering the cache pool, it is handed directly_wirte.myWritable.write("0")The return value istrue
  2. When you performmyWritable.write("1")When, because_wirtecallbackIf not called, it indicates that data has not been written to the end of the last time. The location ensures the order of data writing, and only one buffer can be created"1"Add to the cache pool. behind2-9Is so
  3. When you performmyWritable.write("10"), the buffer length is9, has not reached the buoy value,"10"Continues as a buffer to the cache pool, the cache pool length becomes11, somyWritable.write("1")returnfalseThat means the buffer has enough data and we need to waitdrainEvent notification reproduces data.
  4. After the 100 ms,_write("0", encoding, callback)callbackIs called, indicating that"0"The data has been written. It then checks to see if there is data in the cache pool and calls it first if there is_readConsume the head node of the cache pool ("1"), and the process continues until the cache pool is emptydrainEvent to execute againproductionData
  5. callmyWritable.write("11")Triggers the process that started in step 1 until the flow ends.

Duplex

Once you understand readable and writable streams, a duplex stream is easy to understand. A duplex stream actually inherits the readable stream and implements the writable stream (the source code is written this way, but it is better to implement both).

Duplex flows need to implement both of the following methods

  1. Implement the _read() method to produce data for a readable stream
  2. Implement the _write() method to consume data for a writable stream

How to implement the above two methods was described in the section on writable streams and readable streams above. It is important to note that there are two separate cache pools for the two streams, and their data sources are different

Take NodeJs’s standard I/O stream as an example:

  • When we enter data on the console, it fires its data event, which proves that it has readable stream functionality. Every time the user types enter, it calls the readable push method to push the produced data.
  • We can also write to the console when we call its write method, but it does not trigger a data event, indicating that it has writable stream functionality and a separate buffer. The implementation of the _write method is to make the console display text.
// Whenever the user enters data (_read) on the console, a data event is emitted, which is a readable stream feature
process.stdin.on('data'.data= >{
    process.stdin.write(data);
})

// Data is produced to the standard input stream every second (this is a writable stream feature and is output directly to the console) without triggering data
setInterval(() = >{
    process.stdin.write('Not data entered by the user console')},1000)
Copy the code

Transform

A Duplex stream can be considered a readable stream with a writable stream. Both are independent, each with its own internal buffer. Read and write events occur independently.

                             Duplex Stream
                          ------------------|
                    Read  <-----               External Source
            You           ------------------|  
                    Write ----->               External Sink
                          ------------------|
Copy the code

The Transform flow is duplex, where reads and writes are causal. The endpoints of a duplex flow are linked by some transformation. Reading requires a write to occur.

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------
Copy the code

For creating a Transform flow, it is important to implement the _transform method rather than _write or _read. The _transform processes (consumes) the data written to the writable stream and then produces data for the readable stream.

const { write } = require('fs')
const { Transform, PassThrough } = require('stream')

const reurce = '1312123213124341234213423428354816273513461891468186499126412'

const transform = new Transform({
    highWaterMark: 10.transform(chunk ,encoding, callback){// Convert the data and call push to add the result to the cache pool
        this.push(chunk.toString().replace('1'.The '@'))
        callback()
    },
    flush(callback){// end executes before triggering
        this.push('< < <')
        callback()
    }
})


// write Writes data continuously
let count = 0
transform.write('> > >')
function productionData() {
    let flag = true
    while (count <= 20 && flag) {
        flag = transform.write(count.toString())
        count++
    }
    if (count > 20) {
        transform.end()
    }
}
productionData()
transform.on('drain', productionData)


let result = ' '
transform.on('data'.data= >{
    result += data.toString()
})
transform.on('end'.() = >{
    console.log(result)
    / / > > > 0 @ 23456789 @ @ @ 1 2 3 @ @ 4 @ 5 @ 6 @ 7 8 @ @ 920 < < <
})
Copy the code

Pipe

Pipes take the output of the previous program as the input of the next program, which is what pipes do in Linux. Pipes in NodeJs are similar in that they connect two streams, and the output of the upstream stream serves as the input of the downstream stream.

The pipe sourec.pipe(dest, options) requires sourec to be readable and dest to be writable. The return value is dest.

For a flow in the middle of a pipe that is both upstream of the next flow and downstream of the previous flow, and therefore requires a readable and writable duplex flow, we typically use a transition flow as the flow in the middle of the pipe.

Github1s.com/nodejs/node…

Stream.prototype.pipe = function(dest, options) {
  const source = this;

  function ondata(chunk) {
    if (dest.writable && dest.write(chunk) === false && source.pause) {
      source.pause();
    }
  }

  source.on('data', ondata);

  function ondrain() {
    if (source.readable && source.resume) {
      source.resume();
    }
  }

  dest.on('drain', ondrain);
  / /... The following code is omitted
}
Copy the code

The implementation of Pipe is very clear. When an upstream stream issues a data event, it calls the downstream stream’s write method to write data, and then immediately calls source.pause() to make the upstream state pause, mainly to prevent back pressure.

When the downstream stream finishes consuming the data, it calls source.resume() to make the upstream stream flow again.

We implement one that replaces all 1’s in the data file with @ and prints it to the result file into the pipe.

const { Transform } = require('stream')
const { createReadStream, createWriteStream } = require('fs')

// A transformation flow in a pipe
function createTransformStream(){
    return new Transform({
        transform(chunk, encoding, callback){
            this.push(chunk.toString().replace(/1/g.The '@'))
            callback()
        }
    })
}
createReadStream('./data')
.pipe(createTransformStream())
.pipe(createWriteStream('./result'))
Copy the code

When there are only two streams in a pipe, the function is similar to that of a conversion stream in that one readable stream is concatenated with one writable stream, but a pipe can concatenate multiple streams.

Related articles

  • Node.js Design Pattern uses streams for encoding
  • A brief introduction to flows in Nodejs
  • Everything you need to know about Node.js Streams