preface

Series 1 covered the basic uses of Node.js writable and readable streams. Series 2 will delve into the flowing and paused modes of readable flows in Node.js.

In fact, the flow mode and the pause mode correspond to the push model and the pull model.

Before I get into these two modes of Node.js flow, I’d like to expand on some of the “push and pull models” we already know (or see, but don’t realize).

Thinking about the “Pull” model

la“Represents the initiative of consumersproducersPull data. Producer when requestedpassiveGenerate data while consumersactiveDecide when to request data.

Set out from reality, can understand from these several angles. If there are producers and consumers in our development, there are two kinds of objects. When the observer actively pulls data from the producer, it conforms to the concept of the pull model. The diagram below:

Below, I’ll list a few pull models.

Function

You’re familiar with functions, when you run a function, you’re getting a value. If you think differently, it’s a pull model! Isn’t that surprising! For example, the following function:

function createID() {
  return Math.random().toString(36).substr(2.9);;
}

const id = createID(); // adb8d7xjm
Copy the code

The function itself is the producer, and the program executing the function is the consumer. When the program (the consumer) runs this function, the initiative asks for a return value from the function. The function (the producer) is executed, so an ID value is returned to it.

In addition, “pull” also means “block” in general. Of course, in a single-threaded JavaScript scenario, the meaning of this “blocking” is easy to understand.

Generator

The Generator generators that appear in ES6 also fit the “pull” model. Unlike Function, which only pulls one value (because there is only one return), Generator pulls multiple values (even an infinite number of values)

function* makeRangeIterator(start = 0, end = Infinity, step = 1) {
  for (let i = start; i < end; i += step) {
    yieldi; }}const it = makeRangeIterator(1.5.2);

it.next(); // {value: 1, done: false}
it.next(); // {value: 3, done: false}
it.next(); // {value: undefined, done: true}
Copy the code

In the example code above, after executing the generator function, a traverser object (it being the consumer) is created, and since the generator object itself is a state machine, in this scenario it is actually a producer. At this point, the generator object returns a value after each call to next() to pull the value. As shown in figure:

Of course the Generator can not only “pull” values, but also “push” values. That’s what makes it so powerful and complex. To give you a simple indication of the “push” value, I have written a slightly retarded generator function as follows:

function* getCudeSize() {
  const width = 5;
  const depth = 10;
  const height = yield width;

  return width * height * depth;
}

const getHeightByWidth = w= > w * 1.6;
const it = getCudeSize();
const { value: width } = it.next();
const { value: size } = it.next(getHeightByWidth(width));
Copy the code

The schematic diagram is as follows:

















Thinking about “Push” Model

“Push” represents that producers take the initiative to generate data and push it to consumers. producersactiveGenerate data at your own pace, consumerspassiveReact to the data you receive.

DOM Events

The classic push model is DOM events. There are a bunch of events in the DOM. Mouse events, keyboard events, browser events, and so on. Take the click event we’re most familiar with.

document.addEventListener('click', event => {
    const { x, y } = event;
    console.log('Cursor coordinate: ', x, y);
});
Copy the code

We registered an event listener on the Click event of the Document object. So every time we click on Document, the browser calls the event listener we specified, sending us the event description with the event parameter.











By the way, “push” usually means to wait. Subscribing to events does not block the main thread, but rather waits for the event to occur, triggering the automatic execution of the corresponding callback function to consume the data.

EventEmitter

And the same is true for EventEmiter, which I’m sure you know. The classic publish-subscribe model, or observer model. The author thinks that the general principle is very similar to DOM Events mentioned above. So I’ll skip it.

Promise

DOM Events are also EventEmitter, which can push multiple values. Promise fits the push model first, but it can only push one value.

Promise.resolve('Hi').then(value= > {
  console.log(value);
});
Copy the code

Only if the Promise itself decides to resolve a value will that value be pushed to the consumer function via THEN. As shown in figure:

The RxJS library in the community is full of relevant pull and push concepts. Interested students can go to know oh ~

In fact, in our daily development, there are many, many objects that fit the “push” model. For example, there are SSE (Server-sent Events), setInterval, XMLHttpRequest, Service Workers, Websocket and so on. Here we’ll just pick a few. Other students can compare and understand by themselves.

Of course, series 2 has spent a lot of time talking about “push” and “pull” up to this point, which seems to have nothing to do with the Node.js stream, so you might be a little confused. However, I believe it is very helpful, because only by understanding this kind of producer and consumer mechanism, can we better understand the two modes in Node.js. Because THE author believes that the two concepts are not far apart, but there are differences in the specific way of expression and the way of API use. Of course, over time, you’ll probably find that this push-and-pull learning helps you understand asynchronous programming in JavaScript as well.

“Pull” model – Paused mode of flow

The flow under the pause model conforms to the general framework of the pull model.

All readable streams begin in pause mode, where stream.read() must be explicitly called to read data.

According to our understanding of the “pull” model accumulated above, in pause mode, the readable stream is the producer and the program itself is the consumer. At this point, the stream passively generates the data when requested, and the program actively decides when to request the data.

Suppose we read the file (what-is-a-stream.txt), which is 150 bytes long.

A stream is an abstract interface for working with streaming data in Node.js. The stream module provides an API for implementing the stream interface.
Copy the code

Let’s read the contents of this file while the stream is paused.

const stream = fs.createReadStream(files["what-is-a-stream"] and {highWaterMark: 50
});
stream.on("readable", () = > {console.log('stream is readable! ');
  let data;
  while (null! == (data = stream.read())) {console.log("Received:", data.toString()); }});Copy the code

Here we use the ‘readable’ event, which will be emitted when there is data available to read from the stream. Note that to make this example more obvious, I passed the highWaterMark option 50 as the second parameter in createReadStream to set the readable buffer size. For normal streams, highWaterMark specifies the total number of bytes. For streams in object mode, highWaterMark specifies the total number of objects. Therefore, in the example above, the readable buffer is 50 bytes, and the file size of 150 bytes will be read three times.

As a result, four ‘readable’ events are emitted. (This is emitted one more time than expected because the ‘readable’ event will also be emitted when the end of the stream data is reached, but before the ‘end’ event.)

stream is readable!
Received: A stream is an abstract interface for working with
stream is readable!
Received:  streaming data in Node.js. The stream module prov
stream is readable!
Received: ides an API for implementing the stream interface.
stream is readable!
Copy the code

The example code above is illustrated as follows:

The only difference is that our program subscribes to the producer stream’s ‘readable’ event, which is the equivalent of the stream actively sending out the ‘I still have data to read, come consume me’ signal. The program can then decide when to consume data from the stream cache.

For example, you can also read timer, can also read out! (Just don’t do it.)

setInterval((a)= > {
  let data;
  while (null! == (data = stream.read())) {console.log("Received:", data.toString()); }},30);
Copy the code

“Push” model – Flow Patterns of Flow (Flowing)

As we saw in series 1, readable streams inherit from EventEmitter. So in flow mode, data is automatically read from the underlying system and provided to the application as quickly as possible through events from the EventEmitter interface.

Only the stream listens for the ‘data’ event, and the stream cuts into flow mode, sending an endless stream of data blocks to the program.

The stream, as the producer, has the right to actively push the data, and our program, or event listener handle, is our consumer, which passively receives the data.

const stream = fs.createReadStream(files["what-is-a-stream"] and {highWaterMark: 50}); stream.on("data", chunk => {
  console.log("stream emit data");
  console.log("Received:", chunk.toString());
});
Copy the code

So, also because of the highWaterMark, the stream fired three ‘data’ events.

stream emit data
Received: A stream is an abstract interface for working with
stream emit data
Received:  streaming data in Node.js. The stream module prov
stream emit data
Received: ides an API for implementing the stream interface.
Copy the code

If this process is described by a push model, as shown in the following figure:

There is, of course, a better and more secure way to automatically manage data in flow mode. (Safe because the overload and backpressure of the target writable stream can be handled automatically.) That is: readableStream.pipe(writableStream).

stream.pipe(process.stdout);
Copy the code

This mode is suitable for scenarios where convection does not require careful control, and a simple pipe will accomplish our goal. It is very simple.

Flow mode switching

The two modes of readable flows are a simplified abstraction from the more complex internal state management that occurs in readable flows.

Of course, these two modes of flow can be switched in the following ways. In other words, it is a switch between push and pull models.

Switch from the default pause mode to flow mode (pull and push) :

  1. Stream. on(‘data’, handler)
  2. Call the stream.resume() method
  3. Call the stream.pipe() method to send data to the writable stream

From Flow mode, switch back to pause mode (push, pull) :

  1. If there is no pipe destination, call stream.pause().
  2. If there are pipe targets, remove all pipe targets. Multiple pipe targets can be removed by calling stream.unpipe().

These can be found in official documents. The following sections detail examples of several switching modes and their considerations.

Toggle flow mode: stream.resume()

The readable.resume() method will trigger a’ data’ event to resume a suspended readable stream and switch the stream to flowing mode.

We know above that the stream listens for the ‘data’ event and the stream switches to flow mode and starts firing all data at the same time. If the ‘data’ event listener is not added, data may be lost.

Consider the following code:

// Switch from the default pause mode to flow mode
stream.resume();

// wait 3ms before listening for the 'data' event. Blocks of data may have been lost during 3ms
setTimeout((a)= > {
  stream.on("data", chunk => {
    console.log("stream emit data");
    console.log("Received:", chunk.toString());
  });
  stream.on("end", () = > {console.log("stream emit end");
  });
}, 3);
Copy the code

Blocks can be lost, for example on my computer, when I run this code, only two blocks are printed.

stream emit data
Received:  streaming data in Node.js. The stream module prov
stream emit data
Received: ides an API for implementing the stream interface.
stream emit end
Copy the code

Obviously, the first block is missing. Therefore, this scenario needs special attention!

To switch pause mode: stream.pause()

The readable.pause() method causes streams in flowing mode to stop firing ‘data’ events and switch out of flowing mode. Any available data is retained in the internal cache.

For example, if you call pause to pause the stream after 3ms, it is possible that the following blocks of data will not be output in 3ms and will remain in the internal cache.

// Switch from the default pause mode to flow mode
stream.on("data", chunk => {
  console.log("Received:", chunk.toString());
});

// After 3ms, the stream switches to pause mode, which triggers the 'emit' event
setTimeout((a)= > {
  stream.pause();
}, 3);
Copy the code

For example, we pause after 3ms and then resume after 2000 ms. We print the current time in seconds before dividing the data into blocks.

// Switch from the default pause mode to flow mode
stream.on("data", chunk => {
	console.log(`Received at The ${(new Date).getSeconds()}s: `,  chunk.toString());
});

// After 3ms, the stream switches to pause mode, which triggers the 'emit' event
setTimeout((a)= > {
  stream.pause();
}, 3);

// After 2000ms the stream is switched to flow mode and the stream continues to emit' emit' events
setTimeout((a)= > {
  stream.resume();
}, 2000);
Copy the code

The console prints the following:

Received at 42s:  A stream is an abstract interface for working with
Received at 44s:   streaming data in Node.js. The stream module prov
Received at 44s:  ides an API for implementing the stream interface.
Copy the code

However, it is important to note that the readable.pause() method will not work if a stream has a ‘readable’ event listener or if stream.read() is called.

Toggle pause mode: stream.unpipe()

The readable.unpipe() method unbinds writable streams that were previously bound with the stream.pipe() method.

When the ** readablestream.pipe (writableStream) ** mode is present, the readableStream has a pipe destination.

I’ll use the same example here, but I’m sorry to say I need to adjust highWaterMark to a smaller value. (Because the PIPE method in the following example automatically manages the data stream, the original readable cache is 50 bytes with three reads, which is extremely fast and difficult to unpipe before the read is complete). If you change to 20 bytes, you can read about six times, which is slightly slower.

const stream = fs.createReadStream(files["what-is-a-stream"] and {highWaterMark: 20
});
// Switch to flow mode
stream.pipe(process.stdout);
setTimeout((a)= > {
  // Switch to pause mode
  stream.unpipe();
	// In pause mode, data is read
  stream.on("readable", () = > {let data;
    while (null! == (data = stream.read())) {console.log("From paused mode:", data.toString()); }}); },3);
Copy the code

Run the above code and the console prints the following:

A stream is an abstrFrom paused mode: act interface for wo
From paused mode: rking with streaming
From paused mode:  data in Node.js. Th
From paused mode: e stream module prov
From paused mode: ides an API for impl
From paused mode: ementing the stream 
From paused mode: interface.
Copy the code

Neither model is better than the other; in the end, both have their own application scenarios. However, for developers, pipe is sufficient in most scenarios.

The state of a readable stream

We can obtain the current readable flow state through readable.readableFLOWING. At any given moment a readable stream can be in one of three states:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

For example, we looked at the readableFlowing state values based on the above code.

console.log('\nReadableFlowing [before pipe]:', stream.readableFlowing);
stream.pipe(process.stdout);
console.log('\nReadableFlowing [after pipe]:', stream.readableFlowing);
setTimeout((a)= > {
  stream.unpipe();
  console.log('\nReadableFlowing [after unpipe]:', stream.readableFlowing);
  stream.on("readable", () = > {let data;
    while (null! == (data = stream.read())) {console.log("From paused mode:", data.toString()); }});console.log('\nReadableFlowing [after readable]:', stream.readableFlowing);
}, 3);
Copy the code

The console prints the following:

ReadableFlowing [before pipe]: null

ReadableFlowing [after pipe]: true
A stream is an abstract interface for wo
ReadableFlowing [after unpipe]: false

ReadableFlowing [after readable]: false
From paused mode: rking with streaming
From paused mode:  data in Node.js. Th
From paused mode: e stream module prov
From paused mode: ides an API for impl
From paused mode: ementing the stream 
From paused mode: interface.
Copy the code

Thus we learned that there was no mechanism to consume streaming data, and the readable.readableFlowing value was null. Then true and false indicate whether you are in flow mode.

summary

This is the end of today’s series 2, and I believe that students have a basic understanding of the two modes of convection

Netflix JavaScript Talks – Version 7: The Evolution of JavaScript The push-pull model and the pause and flow mode of the flow are indeed very similar in design concept, I believe that there is definitely a mutual reference between the two.

Therefore, the author still means the same as series 1. For learning any technology, there is no need to limit yourself to any rules and regulations. It will be an interesting learning process anyway to give full play to your imagination, practice and verification.

Cheering ~ clapping ~ rubbing sour hands and eyes 🙂

See you in the next series