2021 Node.js Stream Hitchhiker’s Guide
This article is divided into three parts: basic concepts, pipeline chains, and asynchronous iterators
The basic concepts section mainly explains what Node.js Stream is and how it works inside. There are also animations to illustrate the internal workflow to help you get a clearer picture of how different flows work and how they differ.
If you have a clear understanding of the basic concepts, go straight to Parts 2 and 3 and read Part 1 as a reference. The chapter on pipe chains shows the difference between pipes and pipelines in the form of “animation” + “Demo”.
Asynchronous Iterator this part is the focus of this article, this part mainly with Demo code demonstration, can let you quickly understand how to use Async Iterator instead of Stream.
The basic concept of flow
What is flow?
As a computer processes a task, it usually loads data into memory, but when the data is too large, it is impossible to put all the data in memory. Another important reason is that memory I/O speeds are higher than HD and network I/O speeds, and you can’t keep memory in a pending state all the time, so you need buffers and streams. For example, the file size may be larger than the default Node.js buffer size, making it impossible to read the entire file into memory for processing.
There is a need for a way to process the data in segments, and the optimal result is an orderly loading and processing of points. So you can think of a Stream as an ordered chunk of data processing.
The advantage of streaming, compared to other data processing methods, is that there is no need to load a large amount of data in memory and no need to wait until all the data has been loaded into memory to start processing.
Therefore, the processing of stream data has the following two characteristics:
-
Memory saving: There is no need to load a large amount of data into memory before processing it
-
Promotion timeliness: data can be processed at the first segment without waiting for the completion of data loading, which can greatly improve the timeliness of data processing
What does flow do?
In general, using streams to process large data or streaming media is a common application scenario. With such streaming video website or music platform for example, for example, when you open a video and not wait for the whole video complete began to play after the download, but receives these data in the form of a continuous database, so from the moment you click the play you can begin to watch without having to wait.
The design of streams, the processing of ordered chunks of data, also gives us the “composability” of code. You can think of it as a “production line”, where a complete set of data can be broken down into different pieces for processing, just as the production line sends raw materials through each process in one direction to the final product.
When building a more complex system, it is usually broken down into functionally independent parts whose interfaces follow certain specifications and are connected in some way to accomplish more complex tasks together.
An implementation of a stream commonly used in Node.js
In the Node.js ecosystem, many of the built-in modules implement the flow interface. Here is a note of common flows:
Basic concepts of Node.js Stream
There are four basic stream types in Node.js:
-
Writable – a stream that can write data (for example, fs.createWritestream () lets us write data as a stream to a file).
-
Readable – a stream that can read data (for example, fs.createreadStream () lets us read data from a file as a stream).
-
Duplex – a stream that is both readable and writable (such as net.socket).
-
Transform – A Duplex flow that can modify or Transform data during reading and writing. A Transform flow is also a Duplex flow, but its output is associated with its input. Like the Duplex stream, the Transform stream implements both Readable and Writable interfaces. (for example, zlib.createDeflate()).
In addition to the basic types of flows, you need to understand the following flow characteristics:
Events: All streams are instances of EventEmitter, so different streams have different events, which are ways of informing the outside world of their working state.
Separate buffers: Both readable and writable streams have their own separate buffers. If a duplex stream and a conversion stream implement both readable and writable streams, they have both readable and writable buffers.
Character encoding: When we read and write files, we actually operate byte streams. Therefore, we need to pay attention to the encoding format when setting the stream parameter options, which will affect the content and size of chunk. The default encoding for readable and writable streams is not the same, and each stream is different, so it is important to look at the default parameter Settings before using streams to avoid data backlogs.
// fs.createReadStream Default Encoding is NULL
const readableStream = fs.createReadStream(smallFile, { encoding: 'utf-8'.highWaterMark: 1 * 256 });
/** * 1.fs. createWriteStream Default encoding is UTF-8, but if this is not set at creation time, the encoding is based on the actual write() data * 2. Can be writable. SetDefaultEncoding (encoding) to set up, the effect same as * 3. If the default encoding is set, only the specified type * 4 can be written. The encoding type directly affects the number of bytes, and if the following code is not set, it will affect the write() method's ability to write compressed data (resulting in an incorrect number of bytes written to the file compared to the highWaterMark definition) */
const writeableStream = fs.createWriteStream(upperFile, { encoding: 'utf-8'.highWaterMark: 1 * 25 });
// The chunk size that reads the contents of the file is much larger than the size of bytes written to the writable stream at a time, so the 'drain' event is emitted to wait for the data to drain from the writable stream buffer
Copy the code
HighWaterMark: Both readable and writable streams store data in an internal buffer. For non-object streams, highWaterMark specifies the total number of bytes. HighWaterMark is actually a threshold, and it does not limit the size of the write buffer unless the node.js buffer is exceeded directly.
One thing to be clear here is that data is buffered, not cached. The difference between a Buffer and a Cache can be found in this article
A readable stream
Two modes of readable flow
Following the description of the “production pipeline” above, it became clear that readable flows as a source worked in two modes: flowing and paused, which you might think of as passive and active consumption, differing in the way data consumption was acquired.
-
In flow mode, data is automatically read from the underlying system and provided to the application as quickly as possible through events from the EventEmitter interface.
-
In pause mode, data is stacked in the internal buffer, and stream.read() must be explicitly called to read the data block.
All readable streams start in pause mode and can be switched to flow mode by:
-
Add a’ data’ event handle.
-
Call the stream.resume() method.
-
Call the stream.pipe() method to send data to the writable stream.
A readable stream can be switched back to pause mode by:
-
If there is no pipe destination, call stream.pause().
-
If there are pipe targets, remove all pipe targets. Multiple pipe targets can be removed by calling stream.unpipe().
By default, all readable streams start in pause mode, but they can easily be switched to flow mode and can switch back and forth between the two modes if necessary.
When a readable stream is in pause mode, we can use the read() method to read data from the stream on demand. However, in flow mode, data is constantly being read and may be lost if it is not consumed in a timely manner. So in flow mode, we need to fetch and process data through the ‘data’ event.
Flow mode animation demo
Pause mode animation presentation
Key points of a readable stream event
In the readable flow case, you need to focus on two events that are used to read data from a writable stream:
-
When the ‘data’ Event is added, the writable stream will push the data to the Event callback function when it has data in it. You need to consume the data block yourself, otherwise the data will be lost. [If you don’t manually switch the readable stream state to paused mode paused, it stays in flowing mode and the data flows in until it’s all read]
-
Event: ‘readable’ < active consumption > The writable stream will trigger this Event callback when the data is ready, and you will need to use stream.read() in the callback function to actively consume the data. [In pause mode, stream.read() is read once after multiple stream.push() reads, and the current buffer data is read together.]
The ‘readable’ event indicates that the stream has a new dynamic: either there is new data or the stream has read all of it. For the former, stream.read() returns usable data. For the latter, stream.read() returns null.
In Node.js, both readable and writable streams store data in internal buffers, so when the readable stream keeps pushing data into the writable stream it is likely to Backpressure the writable stream’s internal buffer.
Therefore, when the ‘data’ event is used, the data reading rate is inconsistent with the downstream consumption rate, which can easily result in data flow backlogs that affect performance. However, when there is a very large amount of data, using ‘readable’ events will automatically switch the readable stream mode and thus result in better performance, but it is correspondingly harder to understand than ‘data’ events.
// 👨🏫 Readable and Data events exist at the same time. Readable events will preempt control of data events, and only after read() will data flow to them
readableStream.on('readable'.function (this: any) {
// paused
let chunk;
while((chunk = readableStream.read() ! = =null)) {
// flowing
// do something
}
// When readableStream.read() is null, there is no new data
});
readableStream.on('end'.() = > {
console.log('No more data');
});
Copy the code
To understand the use of the ‘data’ event:
// 👨🏫 Add the 'data' event to trigger the readable stream mode switch paused --> FLOWING
readableStream.on('data'.(chunk: string) = > {
// 📌 If necessary, manually switch the flow state pause
readableStream.pause();
// do something
if (whenYouDone) {
// 📌 restore flowreadableStream.resume(); }}); readableStream.on('end'.() = > {
console.log('No more data');
});
Copy the code
Note that for now, we are talking about using readable streams alone. If you are going to use them in combination, events are not recommended. Next, we will talk about how to use pipe combinations to form pipe chains.
Data backlog animation demo
Streams can be written
A writable stream is much better understood than a readable stream, and can be thought of as the end of the “production line” that leads to the final destination.
Key points of writable flow events interpreted
In writable flow events, it is important to understand how writable flow handles the event when the write speed is inconsistent with the read speed:
- Event: ‘drain’If the call
stream.write(chunk)
Returns false, indicating that the current buffer data is already greater than or equal to the writable streamhighWaterMark
Is emitted when it is possible to continue writing data to the stream'drain'
Events.
This may not be easy to understand, but think of a writable flow as a funnel, and we mark the funnel with a scale. If the water level is less than the scale, then it’s safe to continue. When the water level is greater than or equal to the scale, it means that it is not recommended to continue filling water. After I empty the water in the current funnel, I will notify you to continue through the drain event.
Note the language I used above, and this is where Stream gets confusing. The highWaterMark for streams only serves as a warning, or Stream does not force the Stream developer to overburden control. So we can ignore stream.write(chunk) === false and keep calling stream.write(chunk) to write data. Unprocessed data is stored in the internal buffer of the writable stream until it fills the Node.js buffer before being forcibly interrupted.
Addendum: Programming languages are designed to take into account the physical characteristics of devices, thus setting “security limits”. For example, Node.js’ default buffers are of different sizes depending on the device architecture.
On 32-bit architectures, this value currently is2^30- 1 (~1GB).
On 64-bit architectures, this value currently is 2^31 – 1 (~2GB)
Going back to writable streams, we said that streams have their own separate buffers, and the call to writeableStream.write() returns true when the total size of writable buffers inside a writable stream is less than the threshold set by highWaterMark.
False is returned once the size of the internal buffer reaches or exceeds the highWaterMark. If this is done manually via an event, you need to manually pause the writable stream until the writable stream triggers the ‘drain’ event.
// 👨🏫 Add the 'data' event to trigger the readable stream mode switch paused --> FLOWING
readableStream.on('data'.(chunk: string) = > {
// 📌 The switchover flow status is suspended
readableStream.pause();
// If the water level is greater than or equal to the scale it returns false and waits for the drain event
const writeResult = writeableStream.write(chunk, (err) = > {
if (err) {
console.error('Write error :', err);
process.exitCode = 1; }});if (writeResult) {
// 📌 restore flowreadableStream.resume(); }});// If writeableStream.write() writes more bytes than highWaterMark, the drain event is emitted
writeableStream.on('drain'.function () {
console.log(chalk.gray('writable flow drain'));
// 📌 restore flow
readableStream.resume();
});
Copy the code
Can write flow painting demo
Duplex flow and conversion flow
In fact, once you understand how readable and writable streams work, you can easily understand duplex and conversion flows. Duplex and Transform streams are both streams that implement both Readable and Writable interfaces. In other words, a Transform stream is a special type of Duplex stream.
The difference is that a duplex stream can be considered a readable stream with a writable stream. Both are independent, each with its own internal buffer, and read and write events occur independently. The conversion flow is also duplex, where the read and write are sequential and can only be read after the write event occurs.
Duplex Stream
------------------|
Read <----- External Source
You ------------------|
Write -----> External Sink
------------------|
You don't get what you write. It is sent to another source. Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|-------------- You write something, it is transformed, then you read something.Copy the code
The Socket module, for example, is a typical Duplex, in which a Socket receives a message and sends a message by writing at the same time, regardless of whether it reads or writes.
Transformation flow, on the other hand, is suitable for some sequential execution links, and the implementation of Transform requires the implementation of the Transform () method, which represents data transit.
Pipe chain
Understand the composition of flows
Read the following carefully to help you better understand and use flow. As we mentioned earlier, a flow can be thought of as a “production line.” It can be thought of as a chain of pipes that is equivalent to a powerful production line combining different task units on the production line.
It can be argued that when a production line is created, it only defines the purpose for which the production line works (specifying where upstream (readable) streams start and downstream (writable) streams need to be handled), but the production line is not started. As the source of production, the production line will be shut down due to the depletion of upstream raw materials. It is impossible to keep the production line running all the time.
Therefore, there needs to be a switch at the source of the production line to control the running state, and if the downstream processing is slow, it needs to stop and wait for the downstream processing to be completed before starting, so as to ensure unnecessary production accidents. If there is no input, the upstream needs to close and notify the downstream, but it does not mean that other task units of the whole assembly line have also completed their tasks. So each processing unit is independent and has its own response event. So if one of the processing units fails, the administrator should immediately stop the entire production line to avoid loss.
So when composing different flows, you need to focus not only on the source, but also on whether each production unit is working properly. But when the “production pipeline” is complex enough, “paying attention to everything” is not a good idea and can create a lot of mental baggage for developers that requires a more efficient and modern approach.
Operating the flow entirely in event callback mode is equivalent to manual mode, which is inefficient and greatly increases the probability of errors. Nodejs also offers semi-automatic mode, also known as PIPE, which is officially recommended for consuming streaming data. As Node.js evolves, there is now a better fully automated mode called Pipeline. It is possible to mix events and pipes with more subtle manipulation of the data, but be aware that this can lead to ambiguous behavior.
Pipe and pipe chain
A pipeline for a readable stream is connected to a writable stream. If you want multiple (>2) pipeline combinations to process streams, use Duplex or Transform flow combinations to form a pipeline chain.
To make the above code a bit more complicated, a compressed processing unit and a “failure unit” (simulating a failure in the middle of a pipeline chain) were added.
import * as path from 'path';
import * as fs from 'fs';
import * as chalk from 'chalk';
import { createGzip } from 'zlib';
import { Transform } from 'stream';
// Write to prepare file code, resulting in a 1KB file size
const smallFile = path.join(__dirname, '.. /.. /temp/1KB.txt');
const upperFile = path.join(__dirname, '.. /.. /temp/upper_text_pipe.gz');
const readableStream = fs.createReadStream(smallFile, { encoding: 'utf-8'.highWaterMark: 1 * 256 });
const writeableStream = fs.createWriteStream(upperFile, { encoding: 'utf-8'.highWaterMark: 1 * 10 }); // Change small to trigger drain
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
console.log('chunk', chunk);
this.push(chunk.toString().toUpperCase());
/ / fault
callback(new Error('error')); }});// Combine pipe chain
readableStream.pipe(createGzip()).pipe(upperCaseTr).pipe(writeableStream);
upperCaseTr.on('error'.(err) = > {
console.log('upperCaseTr error', err);
// If the middle segment fails, the rest of the pipeline chain should be closed
writeableStream.destroy();
readableStream.destroy();
});
// end
readableStream.on('end'.function () {
console.log(chalk.redBright('Readable stream end'));
});
// close
readableStream.on('close'.function () {
console.error('Readable stream close');
});
readableStream.on('data'.(chunk: string) = > {
console.log(chalk.green('Before compression -->', Buffer.byteLength(chunk)));
});
writeableStream.on('drain'.function () {
console.log(chalk.gray('writable flow drain'));
});
writeableStream.on('close'.function () {
console.log(chalk.redBright('Writable stream close'));
});
writeableStream.on('finish'.function () {
console.log(chalk.redBright('Writable stream Finish'));
});
writeableStream.on('error'.function (err) {
console.error('Writable stream error', err);
});
Copy the code
As you can see from the above code, using events can be tedious if you need to combine different streams. You need to pay attention to the state of each stream and then do different things for different events. The code above is simply a combination of one readable stream and one writable stream, and the complexity increases exponentially with more combinations.
And it’s hard to use promises this way, which is a bad experience for front-end development today. For the pipeline chain, when any processing unit fails, the entire production line should stop.
Pipe animation demo
A more practical example
Create a Node Server, the service needs to access a local a large file, and pass to the front-end:
import * as fs from 'fs';
import { createServer } from 'http';
import * as path from 'path';
const server = createServer();
server.on('request'.async (req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
const readableStream = fs.createReadStream(path.join(__dirname, '.. /.. /.. /temp/big.txt'));
readableStream.pipe(res);
readableStream.on('close'.() = > {
console.log('readableStream close');
});
res.on('close'.() = > {
console.log('response close');
});
res.on('end'.() = > {
console.log('response end');
});
});
server.listen(8000);
Copy the code
If the client browser closes but the transfer is not complete, it should theoretically be considered that the transfer process should stop. Using pipe does not help you handle this, however, and you need to listen for res.on(‘error’) events to help you decide whether to stop the readable stream.
Conclusion pipe
Source location
-
Only data management is provided to avoid inconsistent read/write speeds and avoid data backlog
-
Handling errors in the flow of pipe chains is cumbersome
-
If a readable stream sends an error during processing, the writable stream target does not shut down automatically. If an error occurs, each stream needs to be closed manually to prevent memory leaks.
-
Unable to know the complete state of the pipeline (whether it is finished or not)
Use pipeline instead of pipe
It was not until the release of [email protected] that pipelines were introduced, and for consumers, pipelines were directly recommended as an alternative to Pipes to ensure security.
usepipeline
Modify the Demo code above.
import * as fs from 'fs';
import { createServer } from 'http';
import * as path from 'path';
import { pipeline } from 'stream';
const server = createServer();
server.on('request'.async (req, res) => {
// `req` is an http.IncomingMessage, which is a readable stream.
// `res` is an http.ServerResponse, which is a writable stream.
const readableStream = fs.createReadStream(path.join(__dirname, '.. /.. /.. /temp/big.txt'));
pipeline(readableStream, res, (error) = > {
console.log('pipeline', error);
});
readableStream.on('close'.() = > {
console.log('readableStream close');
});
res.on('close'.() = > {
console.log('response close');
});
res.on('end'.() = > {
console.log('response end');
});
});
server.listen(8000);
Copy the code
As you can see, if an error occurs, the pipeline will close all the streams in the pipeline chain, which also represents the closure of the pipeline chain. It is also easier to use the callback method to monitor the status of the pipeline chain than pipe.
Conclusion pipeline
Source location
-
Provides data management to avoid inconsistent read/write speeds and effectively avoid data backlog
-
Easy to handle pipe chain errors
-
If a readable stream sends an error during processing, all streams in the current pipeline chain are closed to avoid memory leaks.
-
The pipe chain has its own running state, which is easier to understand.
Review the Node.js Stream
From the above introduction, WE believe that we have a relatively comprehensive understanding of the flow, we have understood the basic concept of Node.js Stream, and also understand how to combine the use of Stream and pipeline in a safer and more efficient way to combine the pipeline chain.
However, Stream is still relatively expensive to learn and use, and even though most node.js modules have encapsulated these functions, it is still very difficult to implement Stream itself. However, with pipeline, it is only convenient for us to use pipeline chain for combined flow. For flow, it is still troublesome to use.
For example, if you only need the readable stream to read data and do nothing else, there may be errors or other problems with the readable stream, so if you can’t use pipeline, you still need to use events. But this also shows that Stream can’t be used happily with promises, and certainly can’t use async/await development, which we’re more familiar with.
So how can you improve the Stream development experience in Node.js?
The Stream is then implemented using Async Iterator.
Async Iterator and Stream
What is an asynchronous iterator?
To loop through an iterator with async/await, understand the following code:
async function* generator() {
yield "aaa"
}
for await (let chunk of generator()) {
chunk.toUpperCase();
}
Copy the code
This is not the focus of this article, so if you are not familiar with the concept, check out the following article references:
-
Asynchronous iterator
-
For await … of MDN
Asynchronous iterators and readable streams
When using a readable stream, the process of reading data from a readable stream is somewhat similar to the process of fetching data from an array. Assuming that this array is the data source for the readable stream, all you need to do is read out the contents one by one in an orderly manner, and the working model for the readable stream is implemented.
First implement a readable stream as our reference group:
import { Readable } from 'stream';
const array = [];
for (let i = 0; i < 1024; i++) {
array.push(i);
}
const readableStream = new Readable({
objectMode: true.read() {
for (let i = 0; i <= array.length; i++) {
readableStream.push(i);
}
readableStream.push(null); }}); readableStream.on('data'.(chunk) = > {
console.log(chunk);
});
Copy the code
Convert the array to a Readable stream using readable.from ()
Node.js 12 provides a built-in method, stream.readable. from(iterable, [options]), to create iterators or iterables as Readable streams.
import { Readable } from 'stream';
const array = [];
for (let i = 0; i < 1024; i++) {
array.push(i);
}
const readableStream = Readable.from(array);
readableStream.on('data'.(chunk) = > {
console.log(chunk);
});
Copy the code
The running results are consistent with the above readable stream reference group results.
Use generator iterables
From the API above, if the readable. from object can accept an iterator, the relation a === c 🤔 can be inferred from a === b -> b === c.
There are:
import { Readable } from 'stream';
function* generator() {
for (let i = 0; i < 1024; i++) {
yieldi; }}const readableStream = Readable.from(generator());
readableStream.on('data'.(chunk) = > {
console.log(chunk);
});
Copy the code
The running results are consistent with the above readable stream reference group results.
Asynchronous iterator
import { Readable } from 'stream';
// or: function* generator()
async function* generator() {
for (let i = 0; i < 1024; i++) {
yieldi; }}const readableStream = Readable.from(generator());
async function run() {
for await (let chunk of readableStream) {
console.log(chunk);
}
}
run();
Copy the code
Due to: * * for… The of** statement creates an iteration loop over iterable objects (including Array, Map, Set, String, TypedArray, Arguments, and so on), calls custom iteration hooks, and executes statements for the values of each different property.
You can iterate over the generator function directly, which has the same effect:
import { Readable } from 'stream';
import { promisify } from 'util';
const sleep = promisify(setTimeout);
// or: function* generator()
async function* generator() {
for (let i = 0; i < 1024; i++) {
yieldi; }}async function run() {
for await (let chunk of generator()) {
console.log(chunk);
await sleep(1000);
}
}
run();
Copy the code
If a generator needs to have the characteristics of a Stream, it is best to use readable.from () to convert it into a Readable Stream so that it can have the characteristics of a Stream.
Asynchronous iterator and Transform flow Transform
If we need to combine multiple streams, as mentioned earlier, the best way is to use a pipeline. Readable streams created using asynchronous iterators can be combined into pipelines. Can intermediate execution units be created in the same way?
Take a transformation flow as an example:
import { Readable, Transform, pipeline } from 'stream';
import { createWriteStream } from 'fs';
async function* generate() {
for (let i = 0; i < 1024; i++) {
yieldi; }}async function* transform(source: Readable) {
for await (let chunk of source) {
yieldchunk.toString().toUpperCase(); }}const readableStream = Readable.from(generate());
pipeline(
readableStream,
Transform.by(transform), // Not officially supported yet
createWriteStream(' '),
(error) = > {
console.log(error); });Copy the code
We know that the implementation of transformation flow requires the implementation of the transform function, and the working principle of transformation flow has been described above, so it can be considered that the implementation can be accomplished through asynchronous iterators. This approach is not yet implemented in Node.js, however, and may be implemented in a later version of Node.js@15.
Streams can be written
I’m sorry to say that asynchronous iterators haven’t worked well with writable flows yet, but writable flows are much simpler. The only thing to note is that the ‘drain’ event avoids data backlogs.
The writable stream can then be wrapped in conjunction with promises, which is often the case.
import Stream, { Readable, Writable } from 'stream';
import { promisify } from 'util';
import { once } from 'events';
const finished = promisify(Stream.finished);
const myWritable = new Writable({
highWaterMark: 4.objectMode: true.defaultEncoding: 'utf-8'.// write() the default encoding
write(chunk, encoding, callback) {
console.log('myWritable write:', chunk.toString());
console.log('myWritableLength:', myWritable.writableLength); // placed before callback() calls
// setTimeout(() => {// Simulate asynchrony
if (chunk.toString() === '123') {
return callback(new Error('Write error'));
}
callback();
// }, 0);}});async function* generator() {
for (let i = 0; i < 1024; i++) {
yieldi; }}const readableStream = Readable.from(generator());
readableStream.on('close'.() = > {
console.log('readableStream close');
});
myWritable.on('close'.() = > {
console.log('myWritable close');
});
async function run() {
try {
const write = buildWrite(myWritable);
for await (let chunk of readableStream) {
console.log(chunk);
const writeResult = await write(chunk);
console.log('- >', writeResult);
}
await finished(readableStream);
} catch (error) {
console.log(error); }}// Wrapper for writable streams
function buildWrite(stream: Writable) {
let streamError: any = null;
// Writable streams need error events to catch errors
stream.on('error'.(error) = > {
streamError = error;
});
return function (chunk: Buffer) {
if (streamError) {
return Promise.reject(streamError);
}
const res = stream.write(chunk);
if (res) {
return Promise.resolve(true);
}
return once(stream, 'drain');
};
}
run();
Copy the code
You can run the code above to see the writable flow run.
To summarize
With that in mind, it’s time to wrap up this article with a summary of how to embrace Node.js Streams in 2021.
From all of the above, you can see that Node.js Stream is starting to fully embrace asynchronous iterators, but it’s still a process to get there. Asynchronous iterators not only greatly reduce the cost of using a Stream, but are also more consistent with modern development practices.
After Node.js@12, Readable streams can create iterators or iterables directly as Readable streams using readable.from (). This API brings simplicity to Readable streams. It also brings more gameplay in terms of composability of the code.
However, as of Node.js@14, duplex, transform, and writable streams do not support this feature, and it is expected that these supported apis will be released at Node.js@15 to allow the pipeline chain to fully embrace asynchronous iterators as well.
The emergence of pipeline, which has the same automatic data management capability as pipe, also reduces the complexity of pipeline chain to a certain extent (compared with the original pipe method), greatly simplifies error handling and can automatically close all Stream processing units on the pipeline chain.
To put it simply:
-
Use pipelines instead of pipes
-
A Readable stream can be created directly using readable.from ()
-
Use promises to wrap writable streams to avoid data backlogs
-
Readable streams can be consumed using asynchronous iterators (asynchronous iterators can iterate over iterables and iterators as well as readable streams)
Finally, I sincerely hope that if you have any questions, if you have any mistakes or questions, please point them out in the comments section. Pointing out mistakes can not only help me correct my cognition, but also help you understand.
If you have any questions, please contact me.
reference
Understanding of the Node. Js Stream
Simplify node.js streams through asynchronous iteration
Node Stream everything you need to know
Duplex Duplex
Better understand buffers
Pipeline instead of promise. All
Node Stream Presentation video