Author: Xiao Lei

Personal homepage: Github

Readable Stream is an abstraction of the data source. It provides the ability to fetch and cache data from data sources, as well as to provide data to data consumers.

Next, you will learn how a Readable Stream retrieves data and makes it available to consumers through the two modes of Readable Stream respectively.

Flowing mode

Under flowing mode, readable streams automatically read data from the bottom layer of the system and provided it to the consumer through events from the EventEmitter interface. Unless the developer needs to implement the readable stream himself, the simplest readable.pipe() method can be used to consume the data.

Let’s take a concrete look at how a readable stream works under flowing mode with a simple example.

const { Readable } = require('stream')

let c = 97 - 1
// Instantiate a readable stream
const rs = new Readable({
  read () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null)

    setTimeout((a)= > {
      // Push data to a readable stream
      rs.push(String.fromCharCode(++c))
    }, 100)}})// Pipe the data of the readable stream to the standard output and print it out
rs.pipe(process.stdout)

process.on('exit', () = > {console.error('\n_read() called ' + (c - 97) + ' times')})Copy the code

First let’s look at the implementation of the Readable constructor:

function Readable(options) {
  if(! (this instanceof Readable))
    return new Readable(options);

  // _readableState holds state values for different phases of the readable stream, which will be analyzed in detail below
  this._readableState = new ReadableState(options, this);

  // legacy
  this.readable = true;

  if (options) {
    // Override the internal _read method to customize getting data from the data source
    if (typeof options.read === 'function')
      this._read = options.read;

    if (typeof options.destroy === 'function')
    // Override the internal _destory method
      this._destroy = options.destroy;
  }

  Stream.call(this);
}
Copy the code

When we create the readable stream instance, we pass in a read method to customize the method used to fetch data from the data source. If the developer needs to implement the readable stream himself, this method must be customized, otherwise the program will run with an error. The ReadableState constructor defines a number of state values for different phases of a readable stream:

function ReadableState(options, stream) { options = options || {}; .// object stream flag. Used to make read(n) ignore n and to
  // make all the buffer merging and length checks go away
  // Whether it is object mode, if so, then the data obtained from the buffer is object
  this.objectMode = !! options.objectMode;if (isDuplex)
    this.objectMode = this.objectMode || !! options.readableObjectMode;// the point at which it stops calling _read() to fill the buffer
  // Note: 0 is a valid value, means "don't call _read preemptively ever"
  // High water level, once the amount of data in the buffer buffer exceeds the HWM, the call to retrieve data from the data source will stop
  var hwm = options.highWaterMark;
  var readableHwm = options.readableHighWaterMark;
  var defaultHwm = this.objectMode ? 16 : 16 * 1024;  / / the default value

  if (hwm || hwm === 0)
    this.highWaterMark = hwm;
  else if (isDuplex && (readableHwm || readableHwm === 0))
    this.highWaterMark = readableHwm;
  else
    this.highWaterMark = defaultHwm;

  // cast to ints.
  this.highWaterMark = Math.floor(this.highWaterMark);

  // A linked list is used to store data chunks instead of an array because the
  // linked list can remove elements from the beginning faster than
  // array.shift()
  // Readable is the buffer inside the readable stream
  this.buffer = new BufferList();
  // Buffer data length
  this.length = 0;
  this.pipes = null;
  this.pipesCount = 0;
  // The initial value of flowing mode
  this.flowing = null;
  // Check whether all source data has been read
  this.ended = false;
  // Whether the end event is triggered
  this.endEmitted = false;
  // Whether data is being read from the source into the buffer
  this.reading = false;

  // a flag to be able to tell if the event 'readable'/'data' is emitted
  // immediately, or on a later tick. We set this to true at first, because
  // any actions that shouldn't happen until "later" should generally also
  // not happen before the first read call.
  this.sync = true;

  // whenever we return null, then we set a flag to say
  // that we're awaiting a 'readable' event emission.
  this.needReadable = false;
  this.emittedReadable = false;
  this.readableListening = false;
  this.resumeScheduled = false;

  // has it been destroyed
  this.destroyed = false;

  // Crypto is kind of old and crusty. Historically, its default string
  // encoding is 'binary' so we have to make this configurable.
  // Everything else in the universe uses 'utf8', though.
  // Encoding mode
  this.defaultEncoding = options.defaultEncoding || 'utf8';

  // Waiting for the drain event to be written into the pipe pipe
  // the number of writers that are awaiting a drain event in .pipe()s
  this.awaitDrain = 0;

  // if true, a maybeReadMore has been scheduled
  this.readingMore = false;

  this.decoder = null;
  this.encoding = null;
  if (options.encoding) {
    if(! StringDecoder) StringDecoder =require('string_decoder').StringDecoder;
    this.decoder = new StringDecoder(options.encoding);
    this.encoding = options.encoding; }}Copy the code

In the above example, when a readable stream RS is instantiated, the pipe method of the readable stream instance is called. This formally began the process of the readable stream starting to fetch data from the data source in flowing mode and consuming it by process.stdout.

Readable.prototype.pipe = function (dest, pipeOpts) {
  var src = this
  var state = this._readableState
  ...

  // The readable stream instance listens for data, which is fetched from the data source and passed to the consumer
  src.on('data', ondata)
  function ondata (chunk) {... var ret = dest.write(chunk) ... }... }Copy the code

The flowable streams provided by Node could be converted to flowing = true in three ways:

  • Listening to thedataThe event
  • callstream.resume()methods
  • callstream.pipe()methods

In fact, all three reverted to one method: Strean.resume (), which changed the readable stream’s mode to flowing by calling that method. Returning to the above example, src.on(‘data’, ondata) is actually called internally to listen for data events after calling rs.pipe(), so let’s see what this method does.

Readable.prototype.on = function (ev, fn) {...// Listen for data events
  if (ev === 'data') {
    // The readable stream started with the flowing state being NULL
    // Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing ! = =false)
      this.resume();
  } else if (ev === 'readable') {... }return res;
}
Copy the code

The readable stream listens for the data event and calls the Resume method:

Readable.prototype.resume = function() {
  var state = this._readableState;
  if(! state.flowing) { debug('resume');
    // Set to flowing state
    state.flowing = true;
    resume(this, state);
  }
  return this;
};

function resume(stream, state) {
  if(! state.resumeScheduled) { state.resumeScheduled =true; process.nextTick(resume_, stream, state); }}function resume_(stream, state) {
  if(! state.reading) { debug('resume read 0');
    // Start fetching data from the data source
    stream.read(0);
  }

  state.resumeScheduled = false;
  // Set awaitDrain to 0 if it is flowing
  state.awaitDrain = 0;
  stream.emit('resume');
  flow(stream);
  if(state.flowing && ! state.reading) stream.read(0);
}
Copy the code

The resume method determines whether the readable stream is in flowing mode, while internally calling stream.read(0) to start retrieving data from the data source (where the stream.read() method behaves differently depending on the parameters it receives) :

TODO: This is where different arguments are received by the stream.read(size) method

Readable.prototype.read = function (n) {... if (n ===0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    // If there is no data in the cache and the state is end
    if (state.length === 0 && state.ended)
    // The flow state ends
      endReadable(this);
    else
    // Emit the readable event
      emitReadable(this);
    return null; }...// Data that can be read from the cache
  n = howMuchToRead(n, state);

  // Determine whether data should be fetched from the data source
  // if we need a readable event, then we need to do some reading.
  var doRead = state.needReadable;
  debug('need readable', doRead);

  // if we currently have less than the highWaterMark, then also read some
  // If the buffer size is 0 or the buffer size minus the value of the data to be read is less than HWM, the data needs to be read again
  Length -n indicates that if the length of the current buffer minus the length of the data to be read is less than HWM, doRead is still set to true
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    // Continue reading data
    doRead = true;
    debug('length less than watermark', doRead);
  }

  // however, if we've ended, then there's no point, and if we're already
  // reading, then it's unnecessary.
  // If the data has already been read or is in the reading state, doRead is set to false to indicate that no data needs to be read
  if (state.ended || state.reading) {
    doRead = false;
    debug('reading or ended', doRead);
  } else if (doRead) {
    debug('do read');
    state.reading = true;
    state.sync = true;
    // if the length is currently zero, then we *need* a readable event.
    // If the current buffer length is 0, needReadable is first set to true, and then a readable event will be emitted when there is data in the buffer
    if (state.length === 0)
      state.needReadable = true;
    // call internal read method
    // Get data from the data source in either synchronous or asynchronous state, depending on the internal implementation of the custom _read method, as shown in the sample code in study
    this._read(state.highWaterMark);
    state.sync = false;
    // If _read pushed data synchronously, then `reading` will be false,
    // and we need to re-evaluate how much data we can return to the user.
    // If the _read method is synchronous, the reading field will be false. At this point you need to recalculate how much data needs to be returned to the consumer
    if(! state.reading) n = howMuchToRead(nOrig, state); }// ret is the output data to the consumer
  var ret;
  if (n > 0)
    ret = fromList(n, state);
  else
    ret = null;

  if (ret === null) {
    state.needReadable = true;
    n = 0;
  } else {
    state.length -= n;
  }

  if (state.length === 0) {
    // If we have nothing in the buffer, then we want to know
    // as soon as we *do* get something into the buffer.
    if(! state.ended) state.needReadable =true;

    // If we tried to read() past the EOF, then emit end on the next tick.
    if(nOrig ! == n && state.ended) endReadable(this);
  }

  // As long as the data obtained from the data source is not NULL, i.e., without EOF, the data event will be triggered every time the data is read
  if(ret ! = =null)
    this.emit('data', ret);

  return ret;
}
Copy the code

The readable stream gets the data from the data source and calls the this._read(state.highwatermark) method, which corresponds to the read() method implemented in this example:

const rs = new Readable({
  read () {
    if (c >= 'z'.charCodeAt(0)) return rs.push(null)

    setTimeout((a)= > {
      // Push data to a readable stream
      rs.push(String.fromCharCode(++c))
    }, 100)}})Copy the code

One of the most common read methods that developers need to call themselves is the stream.push method, which gets data from a data source for the consumer to call.

Readable.prototype.push = function (chunk, encoding) {...// Process the data from the data source
  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}

function readableAddChunk (stream, chunk, encoding, addToFront, skipChunkCheck) {...// Whether to add data to the header
      if (addToFront) {
        // If the data cannot be written
        if (state.endEmitted)
          stream.emit('error'.new errors.Error('ERR_STREAM_UNSHIFT_AFTER_END_EVENT'));
        else
          addChunk(stream, state, chunk, true);
      } else if (state.ended) { // EOF has been set, but data is still being pushed
        stream.emit('error'.new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
      } else {
        // Immediately after a read, set the reading state to false
        state.reading = false;
        if(state.decoder && ! encoding) { chunk = state.decoder.write(chunk);if(state.objectMode || chunk.length ! = =0)
            // Add data to the tail
            addChunk(stream, state, chunk, false);
          else
            maybeReadMore(stream, state);
        } else {
          // Add data to the tail
          addChunk(stream, state, chunk, false); }}... return needMoreData(state); }// Process data according to the stream state
function addChunk(stream, state, chunk, addToFront) {
  // Flowing is the state of the readable Stream, and length is the length of the buffer
  When the data was read asynchronously in flowing mode, the readable stream buffer did not hold the data, but fetched it directly and fired a data event for consumer use
  if (state.flowing && state.length === 0 && !state.sync) {
    // For a FLOWING mode Reabable, the readable stream automatically reads data from the underlying system, directly firing data events, and continues to read stream.read(0) from the data source
    stream.emit('data', chunk);
    // Continue fetching data from the cache pool
    stream.read(0);
  } else {
    // update the buffer info.
    // The length of data
    state.length += state.objectMode ? 1 : chunk.length;
    // Add data to the header
    if (addToFront)
      state.buffer.unshift(chunk);
    else
    // Add data to the tail
      state.buffer.push(chunk);

    // Emits a readable event, which notifies the cache that there is now data to read
    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}
Copy the code

Processing of the data was done in the addChunk method, and it was important to note that under flowing, the data might be consumed in a different way:

  1. Data retrieved from the data source may be buffered into a readable stream and then consumed by consumers;
  2. Does not enter the buffer of the readable stream and is used directly by the consumer.

Which of these two cases is used depends on whether the developer calls the push method synchronously or asynchronously, corresponding to the state value of state.sync.

When the push method is called asynchronously, state.sync is false: the data fetched from the data source is made available to consumers by firing a data event instead of being cached. The stream.read(0) method is then called to read the data repeatedly and make it available to consumers.

When the push method is synchronous, state.sync is true: when the data is fetched from the data source, the data is buffered to the readable stream buffer first, rather than being directly used by the consumer by firing a data event. And at that point you might look at the code and wonder, well, if you had that data cached, how did it flow under flowing mode? In fact, when the resume_ method is first called:

function resume_() {...// 
  flow(stream);
  if(state.flowing && ! state.reading) stream.read(0); // Continue to fetch data from the data source
}

function flow(stream) {...If working with flowing state, the stream.read() method is called to fetch data from the stream buffer and make it available to consumers
  while(state.flowing && stream.read() ! = =null);
}
Copy the code

The stream.read() method is called inside the flow method to fetch the data from the readable stream buffer for consumption by the consumer, while the stream.read(0) call continues to fetch data from the data source.

That’s the general flow of how a readable stream, under flowing mode, fetched data from a data source and made it available to consumers.

Paused mode

In PASued mode, consumers need to manually call the stream.read() method to retrieve data.

Here’s an example:

const { Readable } = require('stream')

let c = 97 - 1

const rs = new Readable({
  highWaterMark: 3,
  read () {
    if (c >= 'f'.charCodeAt(0)) return rs.push(null)
    setTimeout((a)= > {
      rs.push(String.fromCharCode(++c))
    }, 1000)
  }
})

rs.setEncoding('utf8')
rs.on('readable', () = > {// console.log(rs._readableState.length)
  console.log('get the data from readable: ', rs.read())
})
Copy the code

By listening for a readable event, you can start a readable stream to fetch data from the data source.

Readable.prototype.on = function (env) {
  if (env === 'data') {... }else if (env === 'readable') {
    // Listen for readable events
    const state = this._readableState;
    if(! state.endEmitted && ! state.readableListening) { state.readableListening = state.needReadable =true;
      state.emittedReadable = false;
      if(! state.reading) { process.nextTick(nReadingNextTick,this);
      } else if (state.length) {
        emitReadable(this); }}}}function nReadingNextTick(self) {
  debug('readable nexttick read 0');
  // Start fetching data from the data source
  self.read(0);
}
Copy the code

After calling the self.read(0) method in nReadingNextTick, the subsequent process, similar to the one analyzed above for a FLOWING mode readable stream to fetch data from the data source, ends with a call to the addChunk method to fetch data into a buffer pushed back into the readable stream:

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    ...
  } else {
    // update the buffer info.
    // The length of data
    state.length += state.objectMode ? 1 : chunk.length;
    // Add data to the header
    if (addToFront)
      state.buffer.unshift(chunk);
    else
    // Add data to the tail
      state.buffer.push(chunk);

    // Emits a readable event, which notifies the cache that there is now data to read
    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}
Copy the code

Once data has been added to the buffer and needReadable(this field indicates whether a readable event needs to be emitted to inform consumers that data is being consumed) is true, readable will be emitted to tell consumers that new data has been pushed into the buffer of the readable stream. The maybeReadMore method is also called to asynchronously fetch more data from the data source:

function maybeReadMore(stream, state) {
  if(! state.readingMore) { state.readingMore =true; process.nextTick(maybeReadMore_, stream, state); }}function maybeReadMore_(stream, state) {
  var len = state.length;
  // In non-FLOWING mode, and the data length of the buffer is smaller than HWM
  while(! state.reading && ! state.flowing && ! state.ended && state.length < state.highWaterMark) { debug('maybeReadMore read 0');
    stream.read(0);
    // Failed to obtain data
    if (len === state.length)
      // didn't get any data, stop spinning.
      break;
    else
      len = state.length;
  }
  state.readingMore = false;
}
Copy the code

Each time new data is pushed into the buffer for the readable stream, the consumer gets the data from the readable stream by calling the stream.read() method after a readable event is emitted.

Back pressure

Back pressure occurs when data consumers consume data at a slower rate than the data provided to consumers by writable streams.

Again through the pipe:



Readable.prototype.pipe = function () {...// Listen for the drain event
  var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain); . src.on('data', ondata)
  function ondata () {
    increasedAwaitDrain = false;
    // Write data to writable
    var ret = dest.write(chunk);
    if (false=== ret && ! increasedAwaitDrain) { ... src.pause(); }}... }function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    // Reduce the amount of awaitDrain in Pipes
    if (state.awaitDrain)
      state.awaitDrain--;
    // If the number of awaitDrain is 0 and data events are bound on readable (ee.ListenerCount returns the number of bound event callbacks)
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      // Re-open the flowing mode
      state.flowing = true; flow(src); }}; }Copy the code

When dest.write(chunk) returns false, meaning that the writable stream was providing data too quickly to the writable stream, the src.pause method is called to suspend the flowing state, and the synchronization also suspends the writable stream from the data source and from the writable stream. At this point, onDrain is called to restore flowing only when the writable stream fires the drain event, while the readable stream continues to input data to the writable stream. For the back pressure of writable streams, see the source code analysis for Writable_stream.

The above is to analyze the internal working mechanism of the readable stream through the two modes of the readable stream. Of course, there are some details you can read the relevant source code if you are interested.