This article is participating in node.js advanced technology essay, click to see more details
The author recently used the pipe function frequently in the development, only knowing that it is the pipe of the stream, but not knowing how it works, so I simply started to learn from the stream, and randomly organized the knowledge and source code into an article to share with everyone.
Stream is a very basic concept in NodeJs, and many of the basic modules are implemented based on streams and play a very important role. Streaming is also a very difficult concept to understand, mainly due to the lack of relevant documentation. For NodeJs beginners, it often takes a lot of time to understand the concept. Fortunately, for most NodeJs users, it is only used to develop Web applications. Inadequate understanding of convection does not affect use. However, understanding streams gives you a better understanding of the other modules in NodeJs, and in some cases it is better to use streams to process data.
How to Understand flow
-
To the consumer of the stream, we can think of the stream as an array from which we only need to focus on getting (consuming) and writing (producing).
-
For the stream developer (who uses the Stream module to create a new instance), the focus is on how to implement some of the methods in the stream. Usually, the focus is on two things: who is the target resource and how to operate on it. Once the target resource is determined, it needs to operate on the target resource according to the different states and events of the stream
Buffer pool
All streams in NodeJs have buffer pools. The purpose of the buffer pool is to increase the efficiency of the stream. When the production and consumption of data takes time, we can store the production data in the buffer pool before the next consumption. However, the buffer pool is not always in use. For example, if the buffer pool is empty, data is not put into the buffer pool after production, but is consumed directly. .
If data is being produced faster than it is being consumed, excess data is waiting somewhere. If the production rate of data is less than the consumption rate of process data, the data will accumulate somewhere and then be consumed again. (Developers have no control over how fast data is produced or consumed, only when they can produce or consume data.)
Where the data waits, accumulates, and then happens out. That’s the buffer pool. The buffer pool is usually located in the COMPUTER’s RAM (memory).
To take a common example of a buffer, when we watch online video, if you have a fast Internet connection, the buffer will always be filled immediately and sent to the system to play, and then immediately buffer the next video. While watching, there will be no lag. If the network speed is slow, loading is displayed, indicating that the buffer is being filled. After filling is complete, data is sent to the system to view this video.
The Buffer pool of a NodeJs stream is a linked list of buffers. Each time data is added to the Buffer pool, a new Buffer node is created and inserted into the end of the list.
EventEmitter
Stream in NodeJs is an abstract interface that implements EventEmitter, so I’ll start with a brief introduction to EventEmitter.
EventEmitter (on, Once, off, emit) is a class for publishing and subtracting events.
const { EventEmitter } = require('events')
const eventEmitter = new EventEmitter()
// Bind a handler for eventA
eventEmitter.on('eventA'.() = > {
console.log('eventA active 1');
});
// Bind a handler for eventB
eventEmitter.on('eventB'.() = > {
console.log('eventB active 1');
});
eventEmitter.once('eventA'.() = > {
console.log('eventA active 2');
});
/ / triggers eventA
eventEmitter.emit('eventA')
// eventA active 1
// eventA active 2
Copy the code
Note that EventEmitter has events called newListener and removeListener. When you add a listener handler to an event object, Both fire newListeners (eventEmitter. Emit (‘newListener’)), and removeListener is fired when a handler is removed.
Note also that the once binding handler is executed only once, and removeListener will be fired before it executes, meaning that the once binding handler is removed before it is fired.
const { EventEmitter } = require('events')
const eventEmitter = new EventEmitter()
eventEmitter.on('newListener'.(event, listener) = >{
console.log('newListener', event, listener)
})
eventEmitter.on('removeListener'.(event, listener) = > {
console.log('removeListener', event, listener)
})
//newListener removeListener[Function(anonymous)]
eventEmitter.on('eventA'.() = > {
console.log('eventA active 1');
});
//newListener eventA [Function (anonymous)]
function listenerB() { console.log('eventB active 1'); }
eventEmitter.on('eventB', listenerB);
// newListener eventB [Function (anonymous)]
eventEmitter.once('eventA'.() = > {
console.log('eventA active 2');
});
// newListener eventA [Function (anonymous)]
eventEmitter.emit('eventA')
// eventA active 1
// removeListener eventA [Function: bound onceWrapper] { listener: [Function (anonymous)] }
// eventA active 2
eventEmitter.off('eventB', listenerB)
// removeListener eventB[Function: listenerB]
Copy the code
But that’s not important for the rest of the story.
Stream
Stream is an abstract interface that handles streaming data in Node.js. Stream is not an actual interface, but rather a general term for all streams. The actual interfaces are ReadableStream, WritableStream, and ReadWriteStream.
interface ReadableStream extends EventEmitter {
readable: boolean; read(size? :number) :string | Buffer;
setEncoding(encoding: BufferEncoding): this;
pause(): this;
resume(): this;
isPaused(): boolean;
pipe<T extendsWritableStream>(destination: T, options? : { end? :boolean | undefined; }): T; unpipe(destination? : WritableStream):this;
unshift(chunk: string | Uint8Array, encoding? : BufferEncoding):void;
wrap(oldStream: ReadableStream): this;
[Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}
interface WritableStream extends EventEmitter {
writable: boolean;
write(buffer: Uint8Array | string, cb? :(err? :Error | null) = > void) :boolean;
write(str: string, encoding? : BufferEncoding, cb? :(err? :Error | null) = > void) :boolean; end(cb? :() = > void) :this;
end(data: string | Uint8Array, cb? :() = > void) :this;
end(str: string, encoding? : BufferEncoding, cb? :() = > void) :this;
}
interface ReadWriteStream extends ReadableStream, WritableStream { }
Copy the code
As you can see, both ReadableStream and WritableStream are interfaces that inherit from EventEmitter classes (ts interfaces can inherit from classes because they’re just merging types).
The corresponding implementation classes for these interfaces are Readable, Writable, and Duplex, respectively
There are four types of flows in NodeJs:
- ReadableStream (implement ReadableStream)
- Writable (implement WritableStream)
- Duplex Readable and writable streams (implement WritableStream after inheriting Readable)
- Transform Flow (inheriting Duplex)
Back pressure problem
We imagine that there is a “pipeline” between the memory and the disk. In the “pipeline” is “flow”. The data flowing into the pipeline is very fast.
The solution to NodeJs Stream is to set a buoy value for each Stream’s buffer pool. When the amount of data reaches this buoy value, pushing data to the buffer pool will return false, indicating that the current Stream has reached the buoy value and no more data is expected to be written. In this case, we should stop data production immediately to prevent the cache pool from becoming too large and causing back pressure.
Readable
Readable is a type of stream that has two modes and three states
Two read modes:
- Flow mode: Data is read from the underlying system and written to the buffer. When the buffer is full, the data is automatically transferred to the registered event handler via EventEmitter as soon as possible
- Pause mode: In this mode EventEmitter will not be actively triggered to transmit data and must be displayed
Readable.read()
Method to read data from a buffer. Read triggers a response to an EventEmitter event.
Three states:
- ReadableFlowing === NULL (initial state)
- ReadableFlowing === False (Pause mode)
- ReadableFlowing === true (Flowing mode)
The original stream’s readable.readableFlowing is NULL
Changes to true after adding the data event. Calling Pause (), unpipe(), or receiving back pressure, or adding a readable event, readableFlowing is set to false, and at that state, Binding listeners to data events does not cause readableFlowing to switch to True.
Calling Resume () causes the readable stream’s readableFlowing to switch to True
Removing all of the Readable events was the only way to make readableFlowing null.
The event name | instructions |
---|---|
readable | Triggered when there is new readable data in the buffer (triggered by every node that wants to cache pool inserts) |
data | It will be triggered after each consumption data, and the parameter is the data of this consumption |
close | Triggered when the stream is closed |
error | Fired when an error occurs to the stream |
The method name | instructions |
---|---|
read(size) | Consume size data. If null is returned, it indicates that the current data is less than size. Otherwise, the consumed data is returned. When size is not passed, all data in the cache pool is consumed |
const fs = require('fs');
const readStreams = fs.createReadStream('./EventEmitter.js', {
highWaterMark: 100// Cache pool buoy value
})
readStreams.on('readable'.() = > {
console.log('Buffer is full')
readStreams.read()// Consume all the data in the cache pool, return the result and fire the data event
})
readStreams.on('data'.(data) = > {
console.log('data')})Copy the code
Github1s.com/nodejs/node…
A readable event is emitted when size is 0.
When the length of data in the cache pool reaches the highWaterMark value, the data is not actively requested for production, but is produced after the data is consumed
If a suspended stream does not call Read to consume data, data and readable will not be emitted later. When read consumption is called, it will determine whether the length of data remaining after consumption is lower than the buoy value. If it is lower than the buoy value, production data will be requested before consumption. This means that the new data will most likely have been produced after the read logic has been executed, and readable will then be triggered again. This mechanism of producing the next consumed data ahead of time and storing it in the cache pool is why the cache flow is fast
There are two cases of flow in flow state
- When the rate of production is slower than that of consumption: In this case, there will be no data left in the cache pool after each production. You can simply pass the production data to the data event (because it is not in the cache pool, so there is no need to call read to consume the data), and then start producing new data immediately after the last consumption. Trigger data again, one to the end of the stream.
- When the production rate is faster than the consumption rate: There is still unconsumed data in the cache pool after each data is produced. In this case, the next data is produced when the data is consumed. After the old data is consumed, the new data is produced and added to the cache pool
The only difference is whether there is still data in the cache pool after the data is produced. If there is data, the data will be pushed to the cache pool for consumption. If there is no data, the data will be directly handed to data without being added to the cache pool.
Note that when a stream of data in a cache pool enters flow mode from paused mode, a loop call to read is made to consume the data only until null is returned
Suspend mode
In pause mode, when a readable stream is created in pause mode, the _read method is automatically called to push data from the data source to the buffer pool until the buffer pool reaches the buoy value. Each time the data reaches the buoy value, the readable stream will trigger a “readable” event, telling the consumer that the data is ready for consumption.
In general, the ‘readable’ event indicates that the stream has a new dynamic: either there is new data, or the end of the stream has been reached. Therefore, a ‘readable’ event will also be emitted before the data from the data source is read;
The handler of the consumer “readable” event actively consumes data from the buffer pool through stream.read(size).
const { Readable } = require('stream')
let count = 1000
const myReadable = new Readable({
highWaterMark: 300.// The read method of the argument is used as the _read method of the stream to fetch the source data
read(size) {
// Suppose there are 1000 ones in our source data
let chunk = null
// The process of reading data is usually asynchronous, such as IO operations
setTimeout(() = > {
if (count > 0) {
let chunkLength = Math.min(count, size)
chunk = '1'.repeat(chunkLength)
count -= chunkLength
}
this.push(chunk)
}, 500)}})Readable will be emitted each time data is successfully pushed to the cache pool
myReadable.on('readable'.() = > {
const chunk = myReadable.read()// Consume all data in the current cache pool
console.log(chunk.toString())
})
Copy the code
Note that if the size of read(size) is greater than the buoy value, the new buoy value is recalculated and the new buoy value is the next quadratic power of size (size <= 2^n, n is the smallest).
// HWM cannot be larger than 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
/ / 1 gb limit
n = MAX_HWM;
} else {
// Remove a maximum power of 2 to prevent excessive HWM increase
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
}
return n;
}
Copy the code
Flow pattern
All readable streams start in pause mode and can be switched to flow mode by:
- Add”
data
“Event handle; - Call”
resume
“Method; - The use of”
pipe
The “method sends data to a writable stream
In flow mode, the data in the buffer pool is automatically output to the consumer for consumption. At the same time, after each output, the _read method is automatically called, and the data from the data source is put into the buffer pool. If there is no data in the buffer pool, the data is directly passed to the data event, without passing through the buffer pool. Until flow mode switches to another pause mode, or data is read from the data source (push(NULL));
A readable stream can be switched back to pause mode by:
- Is called if there is no pipe target
stream.pause()
。 - If there are pipe targets, remove all pipe targets. call
stream.unpipe()
Multiple pipeline targets can be removed.
const { Readable } = require('stream')
let count = 1000
const myReadable = new Readable({
highWaterMark: 300.read(size) {
let chunk = null
setTimeout(() = > {
if (count > 0) {
let chunkLength = Math.min(count, size)
chunk = '1'.repeat(chunkLength)
count -= chunkLength
}
this.push(chunk)
}, 500)
}
})
myReadable.on('data'.data= > {
console.log(data.toString())
})
Copy the code
Writable
Writable streams are simpler than readable streams.
When a producer calls write(chunk), it internally selects whether to cache the data in the buffer queue or calls _write based on some state (corked, writing, etc.). After each write, it tries to empty the cache queue. If the data size in the buffer queue exceeds the highWaterMark value, the consumer calls write(chunk) and returns false, at which point the producer should stop writing.
So when can I continue writing? After all data in the buffer has been successfully _written, the drain event is emitted after the buffer queue has been emptied and the producer can continue writing data.
When a producer needs to end writing data, it calls the stream.end method to notify the writable stream that it has ended.
const { Writable, Duplex } = require('stream')
let fileContent = ' '
const myWritable = new Writable({
highWaterMark: 10.write(chunk, encoding, callback) {// as the _write method
setTimeout(() = >{
fileContent += chunk
callback()// called after the write is finished
}, 500)
}
})
myWritable.on('close'.() = >{
console.log('close', fileContent)
})
myWritable.write('123123')// true
myWritable.write('123123')// false
myWritable.end()
Copy the code
Note that data in the buffer pool to buoy, buffer pool at this time there may be multiple nodes, in the process of empty buffer pool (cycle call _read), will not be as far as possible to a readable stream as a consumer length to buoy the value of the data, but each time a buffer node, even if the length of the buffer in the float values are not consistent
const { Writable } = require('stream')
let fileContent = ' '
const myWritable = new Writable({
highWaterMark: 10.write(chunk, encoding, callback) {
setTimeout(() = >{
fileContent += chunk
console.log('consumer', chunk.toString())
callback()// called after the write is finished
}, 100)
}
})
myWritable.on('close'.() = >{
console.log('close', fileContent)
})
let count = 0
function productionData(){
let flag = true
while (count <= 20 && flag){
flag = myWritable.write(count.toString())
count++
}
if(count > 20){
myWritable.end()
}
}
productionData()
myWritable.on('drain', productionData)
Copy the code
The above is a writable stream with a buoy value of 10. The data source is now a string of 0 — 20 to consecutive digits. ProductionData is used to write data.
- First call
myWritable.write("0")
Because there is no data in the cache pool"0"
Instead of entering the cache pool, it is handed directly_wirte
.myWritable.write("0")
The return value istrue
- When you perform
myWritable.write("1")
When, because_wirte
的callback
If not called, it indicates that data has not been written to the end of the last time. The location ensures the order of data writing, and only one buffer can be created"1"
Add to the cache pool. behind2-9
Is so - When you perform
myWritable.write("10")
, the buffer length is9
, has not reached the buoy value,"10"
Continues as a buffer to the cache pool, the cache pool length becomes11
, somyWritable.write("1")
returnfalse
That means the buffer has enough data and we need to waitdrain
Event notification reproduces data. - After the 100 ms,
_write("0", encoding, callback)
的callback
Is called, indicating that"0"
The data has been written. It then checks to see if there is data in the cache pool and calls it first if there is_read
Consume the head node of the cache pool ("1"
), and the process continues until the cache pool is emptydrain
Event to execute againproductionData
- call
myWritable.write("11")
Triggers the process that started in step 1 until the flow ends.
Duplex
Once you understand readable and writable streams, a duplex stream is easy to understand. A duplex stream actually inherits the readable stream and implements the writable stream (the source code is written this way, but it is better to implement both).
Duplex flows need to implement both of the following methods
- Implement the _read() method to produce data for a readable stream
- Implement the _write() method to consume data for a writable stream
How to implement the above two methods was described in the section on writable streams and readable streams above. It is important to note that there are two separate cache pools for the two streams, and their data sources are different
Take NodeJs’s standard I/O stream as an example:
- When we enter data on the console, it fires its data event, which proves that it has readable stream functionality. Every time the user types enter, it calls the readable push method to push the produced data.
- We can also write to the console when we call its write method, but it does not trigger a data event, indicating that it has writable stream functionality and a separate buffer. The implementation of the _write method is to make the console display text.
// Whenever the user enters data (_read) on the console, a data event is emitted, which is a readable stream feature
process.stdin.on('data'.data= >{
process.stdin.write(data);
})
// Data is produced to the standard input stream every second (this is a writable stream feature and is output directly to the console) without triggering data
setInterval(() = >{
process.stdin.write('Not data entered by the user console')},1000)
Copy the code
Transform
A Duplex stream can be considered a readable stream with a writable stream. Both are independent, each with its own internal buffer. Read and write events occur independently.
Duplex Stream ------------------| Read <----- External Source You ------------------| Write -----> External Sink ------------------| Copy the code
The Transform flow is duplex, where reads and writes are causal. The endpoints of a duplex flow are linked by some transformation. Reading requires a write to occur.
Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|-------------- Copy the code
For creating a Transform flow, it is important to implement the _transform method rather than _write or _read. The _transform processes (consumes) the data written to the writable stream and then produces data for the readable stream.
const { write } = require('fs')
const { Transform, PassThrough } = require('stream')
const reurce = '1312123213124341234213423428354816273513461891468186499126412'
const transform = new Transform({
highWaterMark: 10.transform(chunk ,encoding, callback){// Convert the data and call push to add the result to the cache pool
this.push(chunk.toString().replace('1'.The '@'))
callback()
},
flush(callback){// end executes before triggering
this.push('< < <')
callback()
}
})
// write Writes data continuously
let count = 0
transform.write('> > >')
function productionData() {
let flag = true
while (count <= 20 && flag) {
flag = transform.write(count.toString())
count++
}
if (count > 20) {
transform.end()
}
}
productionData()
transform.on('drain', productionData)
let result = ' '
transform.on('data'.data= >{
result += data.toString()
})
transform.on('end'.() = >{
console.log(result)
/ / > > > 0 @ 23456789 @ @ @ 1 2 3 @ @ 4 @ 5 @ 6 @ 7 8 @ @ 920 < < <
})
Copy the code
Pipe
Pipes take the output of the previous program as the input of the next program, which is what pipes do in Linux. Pipes in NodeJs are similar in that they connect two streams, and the output of the upstream stream serves as the input of the downstream stream.
The pipe sourec.pipe(dest, options) requires sourec to be readable and dest to be writable. The return value is dest.
For a flow in the middle of a pipe that is both upstream of the next flow and downstream of the previous flow, and therefore requires a readable and writable duplex flow, we typically use a transition flow as the flow in the middle of the pipe.
Github1s.com/nodejs/node…
Stream.prototype.pipe = function(dest, options) {
const source = this;
function ondata(chunk) {
if (dest.writable && dest.write(chunk) === false && source.pause) {
source.pause();
}
}
source.on('data', ondata);
function ondrain() {
if (source.readable && source.resume) {
source.resume();
}
}
dest.on('drain', ondrain);
/ /... The following code is omitted
}
Copy the code
The implementation of Pipe is very clear. When an upstream stream issues a data event, it calls the downstream stream’s write method to write data, and then immediately calls source.pause() to make the upstream state pause, mainly to prevent back pressure.
When the downstream stream finishes consuming the data, it calls source.resume() to make the upstream stream flow again.
We implement one that replaces all 1’s in the data file with @ and prints it to the result file into the pipe.
const { Transform } = require('stream')
const { createReadStream, createWriteStream } = require('fs')
// A transformation flow in a pipe
function createTransformStream(){
return new Transform({
transform(chunk, encoding, callback){
this.push(chunk.toString().replace(/1/g.The '@'))
callback()
}
})
}
createReadStream('./data')
.pipe(createTransformStream())
.pipe(createWriteStream('./result'))
Copy the code
When there are only two streams in a pipe, the function is similar to that of a conversion stream in that one readable stream is concatenated with one writable stream, but a pipe can concatenate multiple streams.
Related articles
- Node.js Design Pattern uses streams for encoding
- A brief introduction to flows in Nodejs
- Everything you need to know about Node.js Streams