This article from the Node. Js CheatSheet | Node. Js grammar foundation, framework and practice skills, You can also read JavaScript CheatSheet or Modern Web Development Basics & Engineering Practices to learn more about JavaScript/Node.js in action.
Stream is a basic concept in Node.js, similar to EventEmitter, which focuses on event-driven data processing in IO pipes. Like an array or map, a Stream is a collection of data, but it represents data that is not necessarily in memory. Node.js streams are of the following types:
- Readable Stream: a producer of data, such as process.stdin
- Writable Stream: a Writable Stream, a consumer of data, such as process.stdout or process.stderr
- Duplex Stream: indicates a Duplex Stream that can be read or written
- A Transform Stream of data
Stream itself provides a set of interface specifications, many node.js built-in modules follow this specification, such as the famous FS module, which uses the Stream interface to read and write files. Similarly, each HTTP request is a readable stream, and the HTTP response is a writable stream.
Readable Stream
const stream = require('stream');
const fs = require('fs');
const readableStream = fs.createReadStream(process.argv[2] and {encoding: 'utf8'
});
// Manually set the stream data encoding
// readableStream.setEncoding('utf8');
let wordCount = 0;
readableStream.on('data'.function(data) {
wordCount += data.split(/\s{1,}/).length;
});
readableStream.on('end'.function() {
// Don't count the end of the file.
console.log('%d %s', --wordCount, process.argv[2]);
});
Copy the code
When we create a readable stream, the data flow has not yet started; Add event listeners for Data to make it stream dynamic. After that, it will read a small piece of data and pass it to our callback function. The frequency at which data events are triggered is also determined by the implementer. For example, when a file is read, each line may be triggered once. During HTTP request processing, several kilobytes of data may be triggered once. Refer to the implementation in nodejs/readable-stream/_stream_readable to see that the on function triggers the resume method, which in turn calls the flow function to read the stream:
// function on
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing ! = =false) this.resume(); }...// function flow
while(state.flowing && stream.read() ! = =null) {}
Copy the code
We can also listen for readable events and read the data manually:
let data = ' ';
let chunk;
readableStream.on('readable'.function() {
while((chunk = readableStream.read()) ! =null) { data += chunk; }}); readableStream.on('end'.function() {
console.log(data);
});
Copy the code
Readable Stream also includes the following common methods:
- Readable.pause(): This method will pause the flow of the stream. In other words, it no longer fires a data event.
- Readable.resume(): This method, in contrast to the above method, will make the suspended stream flow again.
- Readable.unpipe(): This method will remove the destination. If an argument is passed in, it stops the readable stream from going to a particular destination, otherwise it removes all destinations.
In everyday development, we can use stream-wormhole to simulate consuming readable streams:
sendToWormhole(readStream, true);
Copy the code
Writable Stream
readableStream.on('data'.function(chunk) {
writableStream.write(chunk);
});
writableStream.end();
Copy the code
When end() is called, all data is written, and the stream fires a Finish event. Note that after calling end(), you can no longer write data to the writable stream.
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString()); callback(); }}); process.stdin.pipe(outStream);Copy the code
Writable Stream also contains some important events related to Readable Stream:
- Error: Triggered when a write or link error occurs
- Pipe: This event is emitted when a readable stream is linked to a writable stream
- Unpipe: Emitted when unpipe is called by a readable stream
Pipe | Pipe
const fs = require('fs');
const inputFile = fs.createReadStream('REALLY_BIG_FILE.x');
const outputFile = fs.createWriteStream('REALLY_BIG_FILE_DEST.x');
// When the pipeline is built, the flow of flow occurs
inputFile.pipe(outputFile);
Copy the code
If multiple pipes are invoked sequentially, Chaining is constructed:
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('output.txt'));
Copy the code
Pipes are also commonly used for file processing in Web servers. In the case of the application in egg.js, we can get a file stream from the Context and pass it into the writable file stream:
📎 See Backend Boilerplate/egg for complete code
const awaitWriteStream = require('await-stream-ready').write;
const sendToWormhole = require('stream-wormhole'); . const stream =await ctx.getFileStream();
const filename =
md5(stream.filename) + path.extname(stream.filename).toLocaleLowerCase();
// The file generates an absolute path
const target = path.join(this.config.baseDir, 'app/public/uploads', filename);
// Generate a file to write to the file stream
const writeStream = fs.createWriteStream(target);
try {
// Write the file stream asynchronously
await awaitWriteStream(stream.pipe(writeStream));
} catch (err) {
// If an error occurs, close the pipe
await sendToWormhole(stream);
throwerr; }...Copy the code
Referring to the introduction to distributed systems, we can’t avoid dealing with so-called Backpressure in a typical flow processing scenario. Both Writable Stream and Readable Stream actually store data in internal buffers, It can be read by writable.writableBuffer or readable.readableBuffer. The write function returns false when the data store to be processed exceeds the highWaterMark or the current write stream is busy. The PIPE function will automatically enable the back pressure mechanism:
When node.js’s flow mechanism detects that the write function returns false, the backpressure system automatically steps in. It suspends the data transfer operation of the current Readable Stream until the consumer is ready.
+===============+ | Your_Data | +=======+=======+ | +-------v-----------+ +-------------------+ +=================+ | Readable Stream | | Writable Stream +---------> .write(chunk) | +-------+-----------+ +---------^---------+ +=======+=========+ | | | | +======================+ | +------------------v---------+ +-----> .pipe(destination) >---+ | Is this chunk too big? | +==^=======^========^==+ | Is the queue busy? | ^ ^ ^ + -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- + + | -- - | | | | | | | >if(! chunk) | | ^ | | emit .end(); | | ^ ^ | >else| | | ^ | emit .write(); +---v---+ +---v---+ | | ^----^-----------------< No | | Yes | ^ | +-------+ +---v---+ ^ | | | ^ emit .pause(); + = = = = = = = = = = = = = = = = = + | | ^ - ^ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - +return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^---^-----------------^---< Buffering | | | |============| | +> emit .drain(); | <Buffer> | | +> emit .resume(); +------------+ | | <Buffer> | | +------------+ add chunk to queue | | <--^-------------------< +============+Copy the code
Duplex Stream
A Duplex Stream can be seen as an aggregation of read and write streams, consisting of two separate read and write streams with separate internal caches that can be read and write asynchronously:
Duplex Stream
------------------|
Read <----- External Source
You ------------------|
Write -----> External Sink
------------------|
Copy the code
We can use Duplex to simulate simple socket operations:
const { Duplex } = require('stream');
class Duplexer extends Duplex {
constructor(props) {
super(props);
this.data = [];
}
_read(size) {
const chunk = this.data.shift();
if (chunk == 'stop') {
this.push(null);
} else {
if (chunk) {
this.push(chunk);
}
}
}
_write(chunk, encoding, cb) {
this.data.push(chunk); cb(); }}const d = new Duplexer({ allowHalfOpen: true });
d.on('data'.function(chunk) {
console.log('read: ', chunk.toString());
});
d.on('readable'.function() {
console.log('readable');
});
d.on('end'.function() {
console.log('Message Complete');
});
d.write('....');
Copy the code
In development, we often need to export a readable stream directly to a writable stream. We can also introduce PassThrough to the stream for additional listening:
const { PassThrough } = require('stream');
const fs = require('fs');
const duplexStream = new PassThrough();
// can be piped from reaable stream
fs.createReadStream('tmp.md').pipe(duplexStream);
// can pipe to writable stream
duplexStream.pipe(process.stdout);
Buffer
duplexStream.on('data'.console.log);
Copy the code
Transform Stream
A Transform Stream is a Duplex Stream implementing the _transform method, which can both read and write and convert to the Stream:
Transform Stream
--------------|--------------
You Write ----> ----> Read You
--------------|--------------
Copy the code
Here we implement a simple Base64 encoder:
const util = require('util');
const Transform = require('stream').Transform;
function Base64Encoder(options) {
Transform.call(this, options);
}
util.inherits(Base64Encoder, Transform);
Base64Encoder.prototype._transform = function(data, encoding, callback) {
callback(null, data.toString('base64'));
};
process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);
Copy the code