Reading notes: This article is divided into three parts: basic concepts, pipeline chains, and asynchronous iterators

The basic Concepts section explains what a Node.js Stream is and how it works internally. There are animations to illustrate the internal workflow, which can help you get a clearer idea of how different streams work and how they differ.

If you have a clear understanding of the basic concepts, go straight to Parts 2 and 3, using part 1 as a reference. The chapter of pipeline chain uses “animation” + “Demo” to demonstrate the difference between pipe and pipeline.

The asynchronous Iterator section is the focus of this article. This section is mainly a Demo code that will help you quickly learn how to use Async Iterator instead of Stream.

Basic concept of flow

What is flow?

When a computer is working on a task, it usually loads data into memory, but when the data is too large, it becomes impossible to keep all the data in memory. Another important reason is that memory IO speed is higher than HD and network IO speed, and memory can not be kept in the pending state, so the buffer and Stream are needed. For example, the file size may be larger than the default node.js buffer size, making it impossible to read the entire file into memory for processing.

You need a way to process the data in segments, and the ideal outcome is to load a bit and process a bit in order. So you can think of a Stream as an ordered block process.

Streams have the advantage of not having to load a large amount of data in memory or wait for all the data to load into memory before the process begins. Therefore, the processing of stream data has the following two characteristics:

  • Memory savings: No need to load a lot of data into memory and then process it
  • Improved timeliness: It is not necessary to wait for all data to be loaded to start processing. Data can be processed in the first segment, which can greatly improve data processing timeliness

What does flow do?

In general, using streams to process large data or streaming media is a common application scenario. With such streaming video website or music platform for example, for example, when you open a video and not wait for the whole video complete began to play after the download, but receives these data in the form of a continuous database, so from the moment you click the play you can begin to watch without having to wait.

The design of the process of processing orderly data blocks also provides us with the “composability” of code. You can think of it as a “production line,” where a complete data set can be broken down into different pieces for processing, just like a production line that sends raw materials in one direction through each process to get the final product.

When building more complex systems, it is common to break them down into functionally independent parts whose interfaces follow certain specifications and are somehow connected to perform more complex tasks together.

Implementations of streams commonly used in Node.js

In the Node.js ecosystem, many built-in modules implement stream interfaces. Here is a record of common streams:

Image credit: Freecodecamp

Basic concepts of Node.js Stream

There are four basic stream types in Node.js:

  • Writable – a stream to which data can be written (for example, fs.createWritestream () lets us write data to a file as a stream).
  • Readable – a stream that can read data (for example, fs.createreadStream () lets us read data from a file as a stream).
  • Duplex – STREAMS that are both readable and writable (for example, net.socket).
  • Transform – a Duplex stream that can modify or Transform data during read and write. A Transform is also a Duplex stream, but its output is related to its input. Like Duplex streams, Transform streams implement both the Readable and Writable interfaces. (for example, zlib.createdeflate ()).

In addition to the basic types of streams, you need to understand the following stream characteristics:

Events: All streams are instances of EventEmitter, so different streams have different events, which are a way of telling the outside world how they’re doing.

Independent buffers: Both readable and writable streams have their own independent buffers, whereas duplex and conversion streams implement both readable and writable streams at the same time, so they have both buffers internally.

Character encoding: When we read and write files, we usually operate on byte streams, so we need to pay attention to the encoding when we set the options parameter. This will affect the content and size of the chunk. The default encoding format for readable and writable streams is different, and each stream is different, so it is important to look at the default parameter Settings before using streams to avoid data backlog problems.

2 const readableStream = fs.createReadStream(smallFile, {encoding: 'utf-8', highWaterMark: 1 * 256 }); 3 /** 4 * 1.fs. createWriteStream The default encoding is UTF-8. However, if this encoding is not set during creation, the actual write() encoding will prevail. 5 * 2. Can be writable. SetDefaultEncoding (encoding) to set up, the effect same as above 6 * 3. If the default encoding is set, only the specified type 7 * 4 can be written. The encoding type directly affects the number of bytes. If the following code is not set, 8 */ 9 const writeableStream = fs.createWritestREam (upperFile, upperFile, upperFile) { encoding: 'utf-8', highWaterMark: 1 * 25 }); 10 // The chunk size of the file being read is much larger than the number of bytes that can be written to the writable stream at one time, so the 'drain' event is emitted to wait for the backlog in the writable stream buffer to emptyCopy the code

HighWaterMark: Both readable and writable streams store data in internal buffers. For non-object streams, highWaterMark specifies the total number of bytes. In fact, highWaterMark is only a threshold. It does not limit the size of the data written to the buffer unless it directly exceeds the node.js buffer maximum. One thing to be clear here is that the data is being buffered, not cached. The difference between a Buffer and a Cache can be seen in this article

A readable stream

Two modes of readable streams

From the description of “production lines” above, one can see that there are two modes of operation of a readable flow as a source: flowing and Paused. You can think of them as passive consumption and active consumption, and the difference between them is the way in which the data is consumed.

  • In flow mode, data is automatically read from the underlying system and passed throughEventEmitterEvents for the interface are provided to the application as quickly as possible.
  • In pause mode, data is piled up in an internal buffer and must be invoked explicitlystream.read()Read a data block.

All readable streams start in pause mode and can be switched to flow mode by:

  • add'data'Event handle.
  • callstream.resume()Methods.
  • callstream.pipe()Method to send data to a writable stream.

Readable streams can be switched back to pause mode in the following ways:

  • If there is no pipe target, calledstream.pause().
  • Remove all pipe targets, if any. callstream.unpipe()Multiple pipe targets can be removed.

By default, all readable streams start in pause mode, but you can easily switch them to flow mode and switch back and forth between the two modes if necessary. When the readable stream is in pause mode, we can use the read() method to read data from the stream on demand. However, in flow mode, data is read continuously and may be lost if the data is not consumed in time. So in flow mode, we need to fetch and process data through the ‘data’ event.

Flow mode animation demo

Pause mode animation demo

Readable stream event point interpretation

In the readable stream event, it is important to understand two events that are used to read data in the writable stream:

  • Event: 'data' 【Flow pattern】 <Passive consumption> when'data'When an event is added, data in the writable stream will be pushed to the event callback. You need to consume the block yourself, or the data will be lost if you don’t process it. [If you do not manually switch the readable stream to the paused modepausedIs always in flow modeflowing, data is passed in until all readable data has been read.]
  • Event: 'readable' 【Suspend mode】 <“Active consumptionThis event callback is triggered by the writable stream when the data is ready and needs to be used in the callback functionstream.read()To actively consume data. In pause mode, multiple timesstream.push()Post-disposable usestream.read()Read, the current buffer data will be merged and read. ]

The ‘readable’ event indicates that the stream has new dynamics: either there is new data, or the stream has read all the data. For the former, stream.read() returns the available data. For the latter, stream.read() returns null.

In Node.js, both readable and writable streams store data in internal buffers, so it is possible to Backpressure the writable internal buffers when the readable stream keeps pushing data into the writable. Therefore, when the ‘data’ event is used, the data read speed is not consistent with the downstream consumption rate, which can easily cause data flow backlog problems and affect performance. For very large amounts of data, using the ‘readable’ event will automatically switch the readable stream mode so it provides better performance, but it is correspondingly harder to understand than the ‘data’ event.

1 // πŸ‘¨πŸ« Readable and data events exist simultaneously, with the Readable event preempting control of the Data event only after read(), Readablestream. on('readable', function (this: any) {3 // paused 4 let chunk; 5 while ((chunk = readableStream.read() ! == null)) {6 // flowing 7 // do something 8} 9 // When readableStream.read() is null, there is no new data 10}); 11 readableStream.on('end', () => {12 console.log(' no data left '); 13});Copy the code

Compare and understand the use of ‘data’ events

1 // πŸ‘¨πŸ« δΈ» 钘 : add 'data' event to trigger a readableStream mode switch paused --> flowing 2 readablestream. on('data', (chunk: String) => {3 // πŸ“Œ Manually, if necessary, switch flow state pause 4 readableStream.pause(); 5 // do something 6 if (whenYouDone) {7 // πŸ“Œ 8 readablestream.resume (); 9}}); 11 readableStream.on('end', () => {12 console.log(' no data left '); 13});Copy the code

Note that the current discussion is about using readable streams in their own right. When it comes to combinations, the use of events is not recommended. The next step is to use pipe combinations to form a chain of pipes.

Data backlog animation demo

Streams can be written

A writable stream is actually much easier to understand than a readable stream, and you can think of it as the end of the “production line” that leads to the final destination.

Writable stream event point interpretation

In the writable stream event, it is important to understand how the writable stream handles the inconsistency between the write speed and the read speed, which events will be used:

  • Event: ‘drain’If the callstream.write(chunk)If false is returned, the current buffer data is greater than or equal to writablehighWaterMarkIs triggered when it is possible to continue writing data to the stream'drain'Events.

It’s a little confusing here, but you can think of the writable stream as a funnel, and we’ve marked the funnel with a scale, and if the water level is less than the scale then we’re in a safe state and we can continue. When the water level is greater than or equal to the scale, it is not recommended to continue filling water. After I drain the current water in the funnel, I will inform you to continue through the drain event.

Note my wording above, this is where streams get confusing. The highWaterMark for writable streams is only for warning purposes, or rather, streams do not force Stream developers to backlog controls. So we can still ignore stream.write(chunk) === false and keep calling stream.write(chunk) to write data. The unprocessed data will remain stuck in the internal buffer of the writable stream until the backlog becomes saturated in the Node.js buffer before being forcibly interrupted.

Supplement: The programming language is designed to take into account the physical characteristics of the device, thus setting some “security limits”. For example, node.js’s default Buffer has different sizes depending on the device architecture.

On 32-bit architectures, this value currently is 2^30 – 1 (~1GB).

On 64-bit architectures, this value currently is 2^31 – 1 (~2GB)

Returning to writable streams, we said earlier that streams have their own separate buffers. Calling writeableStream.write() will return true if the total size of the writable buffers inside the stream is less than the threshold set by highWaterMark. False is returned once the size of the internal buffer reaches or exceeds highWaterMark. If you do this manually through an event, you will need to manually pause the writable stream until it triggers the ‘drain’ event.

1 // πŸ‘¨πŸ« δΈ» 钘 : add 'data' event to trigger a readableStream mode switch paused --> flowing 2 readablestream. on('data', (chunk: String) => {3 // πŸ“Œ Switch stream status pause 4 readableStream.pause(); 5 // return false if water level β‰₯ scale, 6 const writeResult = WriteableStream. write(chunk, (err) => {7 if (err) {8 console.error(' write error :', err); 9 process.exitCode = 1; 10}}); 12 if (writeResult) {13 // πŸ“Œ restore flow 14 readablestream.resume (); 15}}); 17 18 // If the number of bytes written to writeableStream.write() is greater than the writable stream highWaterMark, Writeablestream. on('drain', function () {20 console.log(chalk. Gray (' writable stream ')); 22 readablestream.resume (); 23});Copy the code

Writable flow painting demo

Duplex flow and conversion flow

In fact, it is easy to understand duplex and conversion streams once you understand how readable and writable streams work. Both Duplex and Transform are streams that implement both Readable and Writable interfaces. In other words, Transform streams are special Duplex streams. The difference is that a duplex stream can be considered a readable stream with writable streams. Both are independent, each with its own internal buffer, and read and write events occur independently. The conversion stream is also duplex, where the read/write sequence is required until the write event occurs.

1 Duplex Stream 2 ------------------| 3 Read <----- External Source 4 You ------------------| 5 Write -----> External Sink 6 ------------------| 7 You don't get what you write. It is sent to another source. 8 9 Transform Stream 10 --------------|-------------- 11 You Write ----> ----> Read You 12 --------------|-------------- 13 You write something,  it is transformed, then you read something.Copy the code

For example, the Socket module is a very typical Duplex stream. When the Socket receives a message, it can also send a message by writing, and “read and write” does not matter. A Transform flow, on the other hand, applies to steps that have a sequence of execution, and implementing a Transform requires implementing a Transform () method, which represents the transfer of data.

Pipe chain

Understand the composition of flows

Read the following carefully to help you understand and use streams better. As mentioned above, if you can think of a flow as a “production line,” you can think of a pipeline chain as a powerful production line that combines different task units on the production line.

Image source: Google Search (from sohu article)

It can be argued that when a production line is created, only the purpose of the production line is defined (specifying where the upstream (readable stream) starts and who needs to process the downstream (writable stream)), and the production line is not started. As the source of production, the production line will be closed due to the depletion of upstream raw materials. It is impossible for the production line to be in operation all the time.

Therefore, there needs to be a switch at the source of the production line to control the running state, and if the downstream processing is slow, it needs to be suspended until the downstream processing is completed, so as to ensure unnecessary production accidents. When the upstream has no input, it needs to be shut down and notified downstream, but that does not mean that the other task units in the pipeline have completed their tasks. So each processing unit is independent and has its own response event. So if one of the processing units fails, the administrator should stop the whole production line immediately to avoid losses.

So when combining different streams, you need to focus not only on the source, but also on whether each production unit is working properly. But when the “production line” is complex enough, “focus on everything” is not a good idea, and can be a huge mental burden on the developer, which requires a more efficient and modern approach.

Operating streams entirely with event callbacks is the equivalent of manual mode, which is inefficient and greatly increases the probability of error. Nodejs also provides a semi-automatic mode, called PIPE, which is the official recommended way to consume stream data. With the development of Node.js, there is now an even better fully automated mode called Pipeline. Of course, if you have more sophisticated manipulation of the data, you can do this by mixing events and pipes, but be aware that this can lead to ambiguous behavior.

Pipe pipes and pipe chains

A pipeline for a readable stream is connected to a writable stream. If you want to process a stream with multiple (>2) pipeline combinations, you need to combine Duplex streams or Transform streams to form a chain of pipes.

Take the above code example and, to make it more complex, add a compressed processing unit and a “failure unit” (simulating an error in the middle of a pipeline chain).

1 import * as path from 'path'; 2 import * as fs from 'fs'; 3 import * as chalk from 'chalk'; 4 import { createGzip } from 'zlib'; 5 import { Transform } from 'stream'; 8 const smallFile = path.join(__dirname, '.. /.. /temp/1KB.txt'); 9 const upperFile = path.join(__dirname, '.. /.. /temp/upper_text_pipe.gz'); 10 11 const readableStream = fs.createReadStream(smallFile, { encoding: 'utf-8', highWaterMark: 1 * 256 }); 12 const writeableStream = fs.createWriteStream(upperFile, { encoding: 'utf-8', highWaterMark: 1 * 10 }); Const upperCaseTr = new Transform({15 Transform(chunk, encoding, callback) { 16 console.log('chunk', chunk); 17 this.push(chunk.toString().toUpperCase()); 19 callback(new Error(' Error ')); 20}}); 24 readableStream.pipe(createGzip()).pipe(upperCaseTr).pipe(writeableStream); 25 26 upperCaseTr.on('error', (err) => { 27 console.log('upperCaseTr error', err); 29 WriteableStream.destroy (); 30 readableStream.destroy(); 31}); 32 33 // end 34 readableStream.on('end', function () {35 console.log(chalk. RedBright ('end')); 36}); 37 38 // close 39 readablestream. on('close', function () {40 console.error(' readableStream close'); 41}); 42 43 ReadableStream. on('data', (chunk: string) => {44 console.log(chalk. Green (' before compression -->', buffer.bytelength (chunk))); 45}); 46 47 Writeablestream. on(' chalk ', function () {48 console.log(chalk. Gray (' chalk ')); 49}); 50 51 Writeablestream. on('close', function () {52 console.log(chalk. RedBright ('close')); 53}); 54 55 WriteableStream. on('finish', function () {56 console.log(chalk. RedBright ('finish')); 57}); 58 59 WriteableStream. on('error', function (err) {60 console.error(' writable stream error', err); 61});Copy the code

As you can see from the above code, if you need to combine different streams, using events can be quite tedious. You need to pay attention to the state of each stream, and then you need to do different processing for different events. The above code is simply a readable stream combined with a writable stream, but with any more, the complexity grows exponentially.

And it’s hard to use Promise this way, which is a very bad experience for today’s front-end development efforts. In the case of the pipeline chain, the entire production line should stop after any one processing unit fails.

Pipe Animation Demonstration

A more practical example

Create a Node Server that accesses a large file locally and sends it to the front end:

1 import * as fs from 'fs'; 2 import { createServer } from 'http'; 3 import * as path from 'path'; 4 5 const server = createServer(); 6 7 server.on('request', async (req, res) => { 8 // `req` is an http.IncomingMessage, which is a readable stream. 9 // `res` is an http.ServerResponse, which is a writable stream. 10 const readableStream = fs.createReadStream(path.join(__dirname, '.. /.. /.. /temp/big.txt')); 11 12 readableStream.pipe(res); 13 14 readableStream.on('close', () => { 15 console.log('readableStream close'); 16}); 17 res.on('close', () => { 18 console.log('response close'); 19}); 20 res.on('end', () => { 21 console.log('response end'); 22}); 23}); 24 25 server.listen(8000);Copy the code

If the client browser is closed, but the transmission is not complete, then theoretically the transmission should stop. But using pipe doesn’t handle this for you, you need to listen for the res.on(‘error’) event to help you decide whether to stop the readable stream.

Conclusion pipe

Source location

  • This function only manages data, preventing read/write speed inconsistency and data backlog
  • Dealing with errors in the flow of the pipeline chain is cumbersome
  • If the readable stream sends an error during processing, the writable stream target is not automatically closed. If an error occurs, each stream needs to be manually closed to prevent memory leaks.
  • Unable to know the complete state of the pipeline (whether it has finished)

Use pipeline instead of PIPE

It wasn’t until the [email protected] release that pipelines were introduced, and for consumers, the official recommendation was to use pipelines instead of pipes to ensure security.

Source: node.js official document screenshot

Use pipeline to transform the Demo code above

1 import * as fs from 'fs'; 2 import { createServer } from 'http'; 3 import * as path from 'path'; 4 import { pipeline } from 'stream'; 5 6 const server = createServer(); 7 8 server.on('request', async (req, res) => { 9 // `req` is an http.IncomingMessage, which is a readable stream. 10 // `res` is an http.ServerResponse, which is a writable stream. 11 const readableStream = fs.createReadStream(path.join(__dirname, '.. /.. /.. /temp/big.txt')); 12 13 pipeline(readableStream, res, (error) => { 14 console.log('pipeline', error); 15}); 16 readableStream.on('close', () => { 17 console.log('readableStream close'); 18}); 19 res.on('close', () => { 20 console.log('response close'); 21}); 22 res.on('end', () => { 23 console.log('response end'); 24}); 25}); 26 27 server.listen(8000);Copy the code

The result shows that if an error occurs, the pipeline will close all the streams in the pipeline chain. This also represents the closure of the pipeline chain, and it is easier to monitor the state of the pipeline chain using the callback method than using the pipe method.

Conclusion pipeline

Source location

  • Provides data management to prevent read/write speed inconsistency and data backlog
  • It’s easy to deal with pipe chain errors
  • If a readable stream sends an error during processing, all streams in the current pipeline chain are closed to avoid memory leaks.
  • The pipeline chain has its own running state, which is easier to understand.

Review the Node.js Stream

Through the above introduction, I believe that you have a more comprehensive understanding of the flow, we understand the basic concepts of Node.js Stream, and understand how to use Stream and pipeline in a safer and more efficient way to use the combined pipeline chain.

However, Stream still has relatively high learning and use costs. Even though most node.js modules have encapsulated these functions, it is still very troublesome to implement Stream by yourself. With pipeline, it is only convenient for us to combine the flow to use the pipeline chain. For the flow, it is still troublesome to use.

For example, when you only need the readable stream to read data, and you don’t need to do anything else, there may be errors or other problems with the operation of the readable stream, so when you can’t use the pipeline, you still have to use the event to operate. That said, streams don’t work very well with promises, and certainly don’t work with async/await in the more familiar way of development.

How can you improve the development experience of Stream in Node.js? Next we implement the Stream using the Async Iterator.

Async Iterator and Stream

What is an asynchronous iterator?

To loop the iterator with async/await, read the following code:

1 async function* generator() { 2 yield "aaa" 3 } 4 5 for await (let chunk of generator()) { 6 chunk.toUpperCase(); 7}Copy the code

This is not the focus of this article, so if you are not familiar with the concept, look at the following article reference:

  • Asynchronous iterator
  • For await … of MDN

Asynchronous iterators and readable streams

In the case of a readable stream, the process of reading data from a readable stream is similar to the process of fetching data from an array. Assuming that this array is the data source for the readable stream, the working model of the readable stream is implemented by simply reading the contents out one by one in order.

First implement a readable stream as our reference group:

1 import { Readable } from 'stream'; 2 3 const array = []; 4 for (let i = 0; i < 1024; i++) { 5 array.push(i); 6 } 7 8 const readableStream = new Readable({ 9 objectMode: true, 10 read() { 11 for (let i = 0; i <= array.length; i++) { 12 readableStream.push(i); 13 } 14 readableStream.push(null); 15}}); 17 18 readableStream.on('data', (chunk) => { 19 console.log(chunk); 20});Copy the code

Convert the array to a Readable stream using readable.from ()

Node.js 12 provides a built-in method, stream.readable. from(iterable, [Options]), to create an iterator or iterable as a Readable stream.

1 import { Readable } from 'stream'; 2 3 const array = []; 4 for (let i = 0; i < 1024; i++) { 5 array.push(i); 6 } 7 8 const readableStream = Readable.from(array); 9 10 readableStream.on('data', (chunk) => { 11 console.log(chunk); 12});Copy the code

The run result is consistent with the readable stream reference group above.

Iterable objects using a generator

From the API above, the readable. from object can accept an iterator, so a === b –> b === c can be inferred from the a === c relationship πŸ€”.

There are:

1 import { Readable } from 'stream'; 2 3 function* generator() { 4 for (let i = 0; i < 1024; i++) { 5 yield i; 6 } 7 } 8 9 const readableStream = Readable.from(generator()); 10 11 readableStream.on('data', (chunk) => { 12 console.log(chunk); 13});Copy the code

The run result is consistent with the readable stream reference group above.

Asynchronous iterator

1  import { Readable } from 'stream';
2
3  // or: function* generator()
4  async function* generator() {
5    for (let i = 0; i < 1024; i++) {
6      yield i;
7    }
8  }
9  
10 const readableStream = Readable.from(generator());
11  
12 async function run() {
13   for await (let chunk of readableStream) {
14     console.log(chunk);
15   }
16 }
17
18 run();
Copy the code

Because: the for… The OF statement creates an iterative loop over the iterable (Array, Map, Set, String, TypedArray, Arguments objects, and so on), calls the custom iterative hook, and executes the statement for the value of each of the different properties. The generator function can be iterated directly, which can have the same effect:

1  import { Readable } from 'stream';
2  import { promisify } from 'util';
3  const sleep = promisify(setTimeout);
4
5  // or: function* generator()
6  async function* generator() {
7    for (let i = 0; i < 1024; i++) {
8      yield i;
9    }
10  }
11
12 async function run() {
13   for await (let chunk of generator()) {
14     console.log(chunk);
15     await sleep(1000);
16   }
17 }
18
19 run();
Copy the code

If the generator needs to have the Stream feature, it is best to use readable.from () to convert it into a Readable Stream, so that it has the Stream feature.

Asynchronous iterator with Transform flow Transform

If we need to combine multiple streams, as mentioned above, the best way to do this is to use pipeline. Readable streams created using asynchronous iterators can be combined into pipelines. Is it possible to create intermediate execution units in the same way? Take the transformation flow as an example:

1 import { Readable, Transform, pipeline } from 'stream'; 2 import { createWriteStream } from 'fs'; 3 4 async function* generate() { 5 for (let i = 0; i < 1024; i++) { 6 yield i; 7 } 8 } 9 10 async function* transform(source: Readable) { 11 for await (let chunk of source) { 12 yield chunk.toString().toUpperCase(); 13 } 14 } 15 const readableStream = Readable.from(generate()); 16 17 Pipeline (18 readableStream, 19 transform.by (Transform), 20 createWriteStream("), 21 (error) => { 22 console.log(error); 23 24)};Copy the code

We know that the transform flow implementation requires the transform function to be implemented, and since we have explained how the transform flow works, we can assume that this implementation can be accomplished through asynchronous iterators. This is not yet implemented in Node.js, but may be implemented in a later version at Node.js@15.

Streams can be written

Unfortunately, writable streams don’t work very well with asynchronous iterators yet, but writable streams are relatively simple. The only caveat is the ‘drain’ event to avoid a data backlog. You can wrap a writable stream with a Promise, which is often the case.

1 import Stream, { Readable, Writable } from 'stream'; 2 import { promisify } from 'util'; 3 import { once } from 'events'; 4 5 const finished = promisify(Stream.finished); 6 7 const myWritable = new Writable({ 8 highWaterMark: 4, 9 objectMode: true, 10 defaultEncoding: 'utF-8 ', // write() Default encoding 11 write(chunk, encoding, callback) {12 console.log('myWritable write:', chunk.tostring ()); 13 console.log('myWritableLength:', myWritable.writableLength); 14 // setTimeout(() => {// Emulate asynchrony 15 if (chunk.tostring () === '123') {16 return callback(new Error(' write Error ')); 17 } 18 callback(); 19 / /}, 0); 20}}); 22 23 async function* generator() { 24 for (let i = 0; i < 1024; i++) { 25 yield i; 26 } 27 } 28 29 const readableStream = Readable.from(generator()); 30 31 readableStream.on('close', () => { 32 console.log('readableStream close'); 33}); 34 35 myWritable.on('close', () => { 36 console.log('myWritable close'); 37}); 38 39 async function run() { 40 try { 41 const write = buildWrite(myWritable); 42 43 for await (let chunk of readableStream) { 44 console.log(chunk); 45 const writeResult = await write(chunk); 46 console.log('--->', writeResult); 47 } 48 await finished(readableStream); 49 } catch (error) { 50 console.log(error); 55 function buildWrite(stream: Writable) {56 Let streamError: any = null; 58 stream.on('error', (error) => {59 streamError = error; 60}); 61 return function (chunk: Buffer) { 62 if (streamError) { 63 return Promise.reject(streamError); 64 } 65 const res = stream.write(chunk); 66 if (res) { 67 return Promise.resolve(true); 68 } 69 return once(stream, 'drain'); 70}; 71 } 72 73 run();Copy the code

You can run the above code and watch the writable stream run.

To summarize

With that in mind, it’s time to wrap up this post with a summary of how to embrace Node.js Stream in 2021.

From all of this, node.js Stream has begun to fully embrace asynchronous iterators, but it’s still a process to fully embrace them. Asynchronous iterators not only greatly reduce the cost of using Stream, but also more in line with modern development habits.

After Node.js@12 Readable streams will be able to use readable.from () to create iterators or iterables directly as Readable streams. This API brings simplification to Readable streams. At the same time, in terms of the composability of the code, it can also bring more gameplay.

As of Node.js@14, duplex streams, transform streams, and writable streams do not support this feature, but expect apis for this support to be released at Node.js@15 to allow the pipeline chain to fully embrace asynchronous iterators.

The emergence of pipeline, which has the same automatic data management capability as PIPE, reduces the use complexity of pipeline chain to a certain extent (compared with the original pipe), greatly simplifies error handling and can automatically close all Stream processing units on pipeline chain.

To put it simply:

  • usepipelinealternativepipe
  • Readable streams can be usedReadable.from()Directly to create
  • Use promises to wrap writable streams to avoid data backlog problems
  • An asynchronous iterator can be used to consume a readable stream (an asynchronous iterator can iterate over iterables and iterators as well as a readable stream)

Finally, I sincerely hope that if there are any questions, if there are mistakes or questions in the article, please feel free to point them out in the comments section and discuss them together. Pointing out the mistakes will not only help me correct my cognition but also help you understand. If you have any questions, please contact me.

Author: Zhang Haozhe

reference

All You need to know about Duplex better understand Buffer Pipeline instead of Promise. All Node Stream