Author: Xiao Lei
Personal homepage: Github
Readable Stream is an abstraction of the data source. It provides the ability to fetch and cache data from data sources, as well as to provide data to data consumers.
Next, you will learn how a Readable Stream retrieves data and makes it available to consumers through the two modes of Readable Stream respectively.
Flowing mode
Under flowing mode, readable streams automatically read data from the bottom layer of the system and provided it to the consumer through events from the EventEmitter interface. Unless the developer needs to implement the readable stream himself, the simplest readable.pipe() method can be used to consume the data.
Let’s take a concrete look at how a readable stream works under flowing mode with a simple example.
const { Readable } = require('stream')
let c = 97 - 1
// Instantiate a readable stream
const rs = new Readable({
read () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null)
setTimeout((a)= > {
// Push data to a readable stream
rs.push(String.fromCharCode(++c))
}, 100)}})// Pipe the data of the readable stream to the standard output and print it out
rs.pipe(process.stdout)
process.on('exit', () = > {console.error('\n_read() called ' + (c - 97) + ' times')})Copy the code
First let’s look at the implementation of the Readable constructor:
function Readable(options) {
if(! (this instanceof Readable))
return new Readable(options);
// _readableState holds state values for different phases of the readable stream, which will be analyzed in detail below
this._readableState = new ReadableState(options, this);
// legacy
this.readable = true;
if (options) {
// Override the internal _read method to customize getting data from the data source
if (typeof options.read === 'function')
this._read = options.read;
if (typeof options.destroy === 'function')
// Override the internal _destory method
this._destroy = options.destroy;
}
Stream.call(this);
}
Copy the code
When we create the readable stream instance, we pass in a read method to customize the method used to fetch data from the data source. If the developer needs to implement the readable stream himself, this method must be customized, otherwise the program will run with an error. The ReadableState constructor defines a number of state values for different phases of a readable stream:
function ReadableState(options, stream) { options = options || {}; .// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
// Whether it is object mode, if so, then the data obtained from the buffer is object
this.objectMode = !! options.objectMode;if (isDuplex)
this.objectMode = this.objectMode || !! options.readableObjectMode;// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
// High water level, once the amount of data in the buffer buffer exceeds the HWM, the call to retrieve data from the data source will stop
var hwm = options.highWaterMark;
var readableHwm = options.readableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024; / / the default value
if (hwm || hwm === 0)
this.highWaterMark = hwm;
else if (isDuplex && (readableHwm || readableHwm === 0))
this.highWaterMark = readableHwm;
else
this.highWaterMark = defaultHwm;
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift()
// Readable is the buffer inside the readable stream
this.buffer = new BufferList();
// Buffer data length
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
// The initial value of flowing mode
this.flowing = null;
// Check whether all source data has been read
this.ended = false;
// Whether the end event is triggered
this.endEmitted = false;
// Whether data is being read from the source into the buffer
this.reading = false;
// a flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true;
// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
// has it been destroyed
this.destroyed = false;
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
// Encoding mode
this.defaultEncoding = options.defaultEncoding || 'utf8';
// Waiting for the drain event to be written into the pipe pipe
// the number of writers that are awaiting a drain event in .pipe()s
this.awaitDrain = 0;
// if true, a maybeReadMore has been scheduled
this.readingMore = false;
this.decoder = null;
this.encoding = null;
if (options.encoding) {
if(! StringDecoder) StringDecoder =require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding; }}Copy the code
In the above example, when a readable stream RS is instantiated, the pipe method of the readable stream instance is called. This formally began the process of the readable stream starting to fetch data from the data source in flowing mode and consuming it by process.stdout.
Readable.prototype.pipe = function (dest, pipeOpts) {
var src = this
var state = this._readableState
...
// The readable stream instance listens for data, which is fetched from the data source and passed to the consumer
src.on('data', ondata)
function ondata (chunk) {... var ret = dest.write(chunk) ... }... }Copy the code
The flowable streams provided by Node could be converted to flowing = true in three ways:
- Listening to the
data
The event - call
stream.resume()
methods - call
stream.pipe()
methods
In fact, all three reverted to one method: Strean.resume (), which changed the readable stream’s mode to flowing by calling that method. Returning to the above example, src.on(‘data’, ondata) is actually called internally to listen for data events after calling rs.pipe(), so let’s see what this method does.
Readable.prototype.on = function (ev, fn) {...// Listen for data events
if (ev === 'data') {
// The readable stream started with the flowing state being NULL
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing ! = =false)
this.resume();
} else if (ev === 'readable') {... }return res;
}
Copy the code
The readable stream listens for the data event and calls the Resume method:
Readable.prototype.resume = function() {
var state = this._readableState;
if(! state.flowing) { debug('resume');
// Set to flowing state
state.flowing = true;
resume(this, state);
}
return this;
};
function resume(stream, state) {
if(! state.resumeScheduled) { state.resumeScheduled =true; process.nextTick(resume_, stream, state); }}function resume_(stream, state) {
if(! state.reading) { debug('resume read 0');
// Start fetching data from the data source
stream.read(0);
}
state.resumeScheduled = false;
// Set awaitDrain to 0 if it is flowing
state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if(state.flowing && ! state.reading) stream.read(0);
}
Copy the code
The resume method determines whether the readable stream is in flowing mode, while internally calling stream.read(0) to start retrieving data from the data source (where the stream.read() method behaves differently depending on the parameters it receives) :
TODO: This is where different arguments are received by the stream.read(size) method
Readable.prototype.read = function (n) {... if (n ===0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
// If there is no data in the cache and the state is end
if (state.length === 0 && state.ended)
// The flow state ends
endReadable(this);
else
// Emit the readable event
emitReadable(this);
return null; }...// Data that can be read from the cache
n = howMuchToRead(n, state);
// Determine whether data should be fetched from the data source
// if we need a readable event, then we need to do some reading.
var doRead = state.needReadable;
debug('need readable', doRead);
// if we currently have less than the highWaterMark, then also read some
// If the buffer size is 0 or the buffer size minus the value of the data to be read is less than HWM, the data needs to be read again
Length -n indicates that if the length of the current buffer minus the length of the data to be read is less than HWM, doRead is still set to true
if (state.length === 0 || state.length - n < state.highWaterMark) {
// Continue reading data
doRead = true;
debug('length less than watermark', doRead);
}
// however, if we've ended, then there's no point, and if we're already
// reading, then it's unnecessary.
// If the data has already been read or is in the reading state, doRead is set to false to indicate that no data needs to be read
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
// If the current buffer length is 0, needReadable is first set to true, and then a readable event will be emitted when there is data in the buffer
if (state.length === 0)
state.needReadable = true;
// call internal read method
// Get data from the data source in either synchronous or asynchronous state, depending on the internal implementation of the custom _read method, as shown in the sample code in study
this._read(state.highWaterMark);
state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
// If the _read method is synchronous, the reading field will be false. At this point you need to recalculate how much data needs to be returned to the consumer
if(! state.reading) n = howMuchToRead(nOrig, state); }// ret is the output data to the consumer
var ret;
if (n > 0)
ret = fromList(n, state);
else
ret = null;
if (ret === null) {
state.needReadable = true;
n = 0;
} else {
state.length -= n;
}
if (state.length === 0) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if(! state.ended) state.needReadable =true;
// If we tried to read() past the EOF, then emit end on the next tick.
if(nOrig ! == n && state.ended) endReadable(this);
}
// As long as the data obtained from the data source is not NULL, i.e., without EOF, the data event will be triggered every time the data is read
if(ret ! = =null)
this.emit('data', ret);
return ret;
}
Copy the code
The readable stream gets the data from the data source and calls the this._read(state.highwatermark) method, which corresponds to the read() method implemented in this example:
const rs = new Readable({
read () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null)
setTimeout((a)= > {
// Push data to a readable stream
rs.push(String.fromCharCode(++c))
}, 100)}})Copy the code
One of the most common read methods that developers need to call themselves is the stream.push method, which gets data from a data source for the consumer to call.
Readable.prototype.push = function (chunk, encoding) {...// Process the data from the data source
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}
function readableAddChunk (stream, chunk, encoding, addToFront, skipChunkCheck) {...// Whether to add data to the header
if (addToFront) {
// If the data cannot be written
if (state.endEmitted)
stream.emit('error'.new errors.Error('ERR_STREAM_UNSHIFT_AFTER_END_EVENT'));
else
addChunk(stream, state, chunk, true);
} else if (state.ended) { // EOF has been set, but data is still being pushed
stream.emit('error'.new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
} else {
// Immediately after a read, set the reading state to false
state.reading = false;
if(state.decoder && ! encoding) { chunk = state.decoder.write(chunk);if(state.objectMode || chunk.length ! = =0)
// Add data to the tail
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
// Add data to the tail
addChunk(stream, state, chunk, false); }}... return needMoreData(state); }// Process data according to the stream state
function addChunk(stream, state, chunk, addToFront) {
// Flowing is the state of the readable Stream, and length is the length of the buffer
When the data was read asynchronously in flowing mode, the readable stream buffer did not hold the data, but fetched it directly and fired a data event for consumer use
if (state.flowing && state.length === 0 && !state.sync) {
// For a FLOWING mode Reabable, the readable stream automatically reads data from the underlying system, directly firing data events, and continues to read stream.read(0) from the data source
stream.emit('data', chunk);
// Continue fetching data from the cache pool
stream.read(0);
} else {
// update the buffer info.
// The length of data
state.length += state.objectMode ? 1 : chunk.length;
// Add data to the header
if (addToFront)
state.buffer.unshift(chunk);
else
// Add data to the tail
state.buffer.push(chunk);
// Emits a readable event, which notifies the cache that there is now data to read
if (state.needReadable)
emitReadable(stream);
}
maybeReadMore(stream, state);
}
Copy the code
Processing of the data was done in the addChunk method, and it was important to note that under flowing, the data might be consumed in a different way:
- Data retrieved from the data source may be buffered into a readable stream and then consumed by consumers;
- Does not enter the buffer of the readable stream and is used directly by the consumer.
Which of these two cases is used depends on whether the developer calls the push method synchronously or asynchronously, corresponding to the state value of state.sync.
When the push method is called asynchronously, state.sync is false: the data fetched from the data source is made available to consumers by firing a data event instead of being cached. The stream.read(0) method is then called to read the data repeatedly and make it available to consumers.
When the push method is synchronous, state.sync is true: when the data is fetched from the data source, the data is buffered to the readable stream buffer first, rather than being directly used by the consumer by firing a data event. And at that point you might look at the code and wonder, well, if you had that data cached, how did it flow under flowing mode? In fact, when the resume_ method is first called:
function resume_() {...//
flow(stream);
if(state.flowing && ! state.reading) stream.read(0); // Continue to fetch data from the data source
}
function flow(stream) {...If working with flowing state, the stream.read() method is called to fetch data from the stream buffer and make it available to consumers
while(state.flowing && stream.read() ! = =null);
}
Copy the code
The stream.read() method is called inside the flow method to fetch the data from the readable stream buffer for consumption by the consumer, while the stream.read(0) call continues to fetch data from the data source.
That’s the general flow of how a readable stream, under flowing mode, fetched data from a data source and made it available to consumers.
Paused mode
In PASued mode, consumers need to manually call the stream.read() method to retrieve data.
Here’s an example:
const { Readable } = require('stream')
let c = 97 - 1
const rs = new Readable({
highWaterMark: 3,
read () {
if (c >= 'f'.charCodeAt(0)) return rs.push(null)
setTimeout((a)= > {
rs.push(String.fromCharCode(++c))
}, 1000)
}
})
rs.setEncoding('utf8')
rs.on('readable', () = > {// console.log(rs._readableState.length)
console.log('get the data from readable: ', rs.read())
})
Copy the code
By listening for a readable event, you can start a readable stream to fetch data from the data source.
Readable.prototype.on = function (env) {
if (env === 'data') {... }else if (env === 'readable') {
// Listen for readable events
const state = this._readableState;
if(! state.endEmitted && ! state.readableListening) { state.readableListening = state.needReadable =true;
state.emittedReadable = false;
if(! state.reading) { process.nextTick(nReadingNextTick,this);
} else if (state.length) {
emitReadable(this); }}}}function nReadingNextTick(self) {
debug('readable nexttick read 0');
// Start fetching data from the data source
self.read(0);
}
Copy the code
After calling the self.read(0) method in nReadingNextTick, the subsequent process, similar to the one analyzed above for a FLOWING mode readable stream to fetch data from the data source, ends with a call to the addChunk method to fetch data into a buffer pushed back into the readable stream:
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
...
} else {
// update the buffer info.
// The length of data
state.length += state.objectMode ? 1 : chunk.length;
// Add data to the header
if (addToFront)
state.buffer.unshift(chunk);
else
// Add data to the tail
state.buffer.push(chunk);
// Emits a readable event, which notifies the cache that there is now data to read
if (state.needReadable)
emitReadable(stream);
}
maybeReadMore(stream, state);
}
Copy the code
Once data has been added to the buffer and needReadable(this field indicates whether a readable event needs to be emitted to inform consumers that data is being consumed) is true, readable will be emitted to tell consumers that new data has been pushed into the buffer of the readable stream. The maybeReadMore method is also called to asynchronously fetch more data from the data source:
function maybeReadMore(stream, state) {
if(! state.readingMore) { state.readingMore =true; process.nextTick(maybeReadMore_, stream, state); }}function maybeReadMore_(stream, state) {
var len = state.length;
// In non-FLOWING mode, and the data length of the buffer is smaller than HWM
while(! state.reading && ! state.flowing && ! state.ended && state.length < state.highWaterMark) { debug('maybeReadMore read 0');
stream.read(0);
// Failed to obtain data
if (len === state.length)
// didn't get any data, stop spinning.
break;
else
len = state.length;
}
state.readingMore = false;
}
Copy the code
Each time new data is pushed into the buffer for the readable stream, the consumer gets the data from the readable stream by calling the stream.read() method after a readable event is emitted.
Back pressure
Back pressure occurs when data consumers consume data at a slower rate than the data provided to consumers by writable streams.
Again through the pipe:
Readable.prototype.pipe = function () {...// Listen for the drain event
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain); . src.on('data', ondata)
function ondata () {
increasedAwaitDrain = false;
// Write data to writable
var ret = dest.write(chunk);
if (false=== ret && ! increasedAwaitDrain) { ... src.pause(); }}... }function pipeOnDrain(src) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
// Reduce the amount of awaitDrain in Pipes
if (state.awaitDrain)
state.awaitDrain--;
// If the number of awaitDrain is 0 and data events are bound on readable (ee.ListenerCount returns the number of bound event callbacks)
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
// Re-open the flowing mode
state.flowing = true; flow(src); }}; }Copy the code
When dest.write(chunk) returns false, meaning that the writable stream was providing data too quickly to the writable stream, the src.pause method is called to suspend the flowing state, and the synchronization also suspends the writable stream from the data source and from the writable stream. At this point, onDrain is called to restore flowing only when the writable stream fires the drain event, while the readable stream continues to input data to the writable stream. For the back pressure of writable streams, see the source code analysis for Writable_stream.
The above is to analyze the internal working mechanism of the readable stream through the two modes of the readable stream. Of course, there are some details you can read the relevant source code if you are interested.