A readable stream is a stream that produces data for consumption by a program. Common data production methods include reading disk files, reading network requests, etc. Let’s take a look at the previous examples of what streaming is:
const rs = fs.createReadStream(filePath);Copy the code
Rs is a readable stream that produces data by reading files from disk, and console process.stdin is also a readable stream:
process.stdin.pipe(process.stdout);Copy the code
While console input can be printed out in a single sentence, process.stdin produces data by reading user input from the console.
Let’s go back to the definition of a readable stream:
Custom readable streams
Except what the system provides
gulp.src(['*.js'.'dist/**/*.scss'])Copy the code
If you want to produce data in a particular way for your program to consume, how do you start?
Two simple steps
- Inherit the Readable class of the SREAM module
- Override the _read method and call this.push to put the produced data into a queue for reading
The Readable class already does most of the work for a Readable stream. You can implement a custom Readable stream simply by inheriting it and writing the method that produces the data into the _read method.
For example: implementing a stream that produces a random number every 100 milliseconds (useless)
const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
constructor(max) {
super()
}
_read() {
const ctx = this;
setTimeout(() => { const randomNumber = parseInt(Math.random() * 10000); // Can only push strings or buffers, to facilitate display by hitting a return ctx.push('${randomNumber}\n`);
}, 100);
}
}
module.exports = RandomNumberStream;Copy the code
The class inheritance part of the code is very simple, but let’s take a look at the implementation of the _read method, with a few notable points
- The Readable class has a default implementation of the _read method, but it does nothing. What we are doing is overwriting it, okay
- The _read method takes a size argument that specifies to the read method how much data to read and return, but this is just a reference. Many implementations ignore this parameter, as we do here, and we’ll get to that later
- This. Push pushes data to the buffer, which we’ll talk about later
- The contents of push can only be strings or buffers, not numbers
- The push method has a second argument encoding, which is used to specify encoding if the first argument is a string
Let’s do it and see what happens
const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream();
rns.pipe(process.stdout);Copy the code
This allows you to see a steady stream of numbers being displayed on the console, implementing a readable stream of random numbers, and a few minor problems to be solved
How to Stop
Push a number to the buffer every 100 milliseconds, so just like reading a local file, there is always some time to finish reading, how to stop to signal that the data is finished reading?
Push a NULL into the buffer and modify the code to allow the consumer to define how many random numbers are required:
const Readable = require('stream').Readable;
class RandomNumberStream extends Readable {
constructor(max) {
super()
this.max = max;
}
_read() {
const ctx = this;
setTimeout(() => {
if(ctx.max) { const randomNumber = parseInt(Math.random() * 10000); // Can only push strings or buffers, to facilitate display by hitting a return ctx.push('${randomNumber}\n`);
ctx.max -= 1;
} else{ ctx.push(null); }}, 100); } } module.exports = RandomNumberStream;Copy the code
A Max flag is used in the code to allow the consumer to specify the number of characters needed at instantiation time
const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream(5);
rns.pipe(process.stdout);Copy the code
You can see that the console only prints 5 characters
Why setTimeout and not setInterval
Careful students may have noticed that producing a random number every 100 milliseconds is not a setInterval, but a setTimeout. Why is it only delayed and not repeated, but the result is correct?
This requires an understanding of the two ways streams work
- Flow mode: The data is read by the underlying system and provided to the application as quickly as possible
- Pause mode: The read() method must be explicitly called to read several blocks of data
By default, a stream is in pause mode, which requires the program to explicitly call the read() method. In this example, we get data without calling it, because the pipe() method switches to flow mode, and the _read() method is automatically called repeatedly until the data is read. So the _read() method only needs to read the data once
Switch between flow mode and pause mode
A stream can be switched from the default pause mode to flow mode in the following ways:
- Start data listening by adding a Data event listener
- Call the Resume () method to start the data stream
- Call the pipe() method to forward the data to another writable stream
There are two ways to switch from flow mode to pause mode:
- A stream without pipe() can be paused by calling pause()
- When pipe(), remove all listening for data events and then call the unpipe() method
Data event
Using the pipe() method makes the data flow from the readable stream to the writable stream, but looks like a black box to the user. How does the data flow? There are two important terms to use when switching between flow and pause modes
- Data event corresponding to flow mode
- Pause mode corresponds to the read() method
These two mechanisms are why applications can drive the flow of data. Let’s take a look at the flow mode data event. Once we listen for the data event of the readable stream, the stream is in flow mode
const RandomNumberStream = require('./RandomNumberStream');
const rns = new RandomNumberStream(5);
rns.on('data', chunk => {
console.log(chunk);
});Copy the code
You can see that the console prints something like the following
<Buffer 39 35 37 0a>
<Buffer 31 30 35 37 0a>
<Buffer 38 35 31 30 0a>
<Buffer 33 30 35 35 0a>
<Buffer 34 36 34 32 0a>Copy the code
A data event is emitted when a readable stream produces data that can be consumed, and data event listeners are bound to deliver the data as much as possible. The listener of the data event receives the Buffer in the first argument from the readable stream, which is the chunk printed by the console. If you want to display it as a number, you can call Buffer’s toString() method
Another is triggered when the data processing is complete
rns.on('end', () => {
console.log('done');
});Copy the code
Of course, error events will be triggered during the data processing process. You can listen to do exception processing:
rns.on('error', (err) => {
console.log(err);
});Copy the code
read(size)
A stream in pause mode requires the program to explicitly call the read() method to retrieve data. The read() method pulls and returns some data from the internal buffer, and returns NULL when no more data is available
When the read() method reads data, if size is passed, it returns the specified byte of data; Null is returned when the specified size byte is not available. If size is not specified, all data in the internal buffer is returned
Now there is a paradox. It is convenient to generate data in flow mode and then trigger a data event notification to the program. In pause mode, the program needs to read, so there is a possibility that the reading is not good enough, if the use of polling method is a little inefficient
NodeJS provides one
const rns = new RandomNumberStream(5);
rns.on('readable', () = > {let chunk;
while((chunk = rns.read()) !== null){
console.log(chunk);
}
});Copy the code
It is worth noting that not every time the read() method is called, it returns null if the available data does not reach size, so we add a judgment to the program
If the data is missing
const stream = fs.createReadStream('/dev/input/event0');
stream.on('readable', callback);Copy the code
Is there a problem in flow mode where the readable stream produces data when it is created, and if the readable event is emitted because some data is produced before the readable event is bound, will the data be lost in extreme cases?
In an event queue, both production data and event listeners are asynchronous operations, while on listener events using process.nextTick are guaranteed to be bound before data is produced. See the Event loop in the Timer section for more information
There may be some questions about data events, when readable events are emitted, how much data the read() method reads each time, and when it returns NULL. This will be explained in the back Pressure section of the writable streams section in conjunction with the source code