- Node.js Streams: Everything You need to know
- By Samer Buna
- The Nuggets translation Project
- Translator: loveky
- Proofreader: Zaraguo Aladdin-ADD
Node.js streams: Everything you need to know
photo
Flows in Node.js have a reputation for being hard to use and even harder to understand. Now I have good news for you: that’s no longer the case.
For a long time, developers have created many packages to make it easier to use streams. But in this article, I’ll focus on the native Node.js streaming API.
“Streams are the best and most misunderstood idea in Node.”
– Dominic Tarr
What exactly is flow?
A stream is a collection of data — just like an array or string. The difference is that the data in the stream may not all be available at once, and you don’t have to put it all into memory at once. This makes streams useful when manipulating large amounts of data or when data is sent segment by segment from an external source.
However, the role of streams is not limited to manipulating large amounts of data. It also gives us the ability to combine code. Just as we can pipe a few simple Linux commands together to create powerful functionality, we can do the same with streams in Node.
Composability of Linux commands
const grep = ... // stream const wc =... // stream grep. Pipe (wc)Copy the code
Many built-in modules in Node implement the stream interface:
The screenshot is from my Pluralsight course, Advanced Node.js
The above list has some node.js native objects, which are also read-write streams. Some of these objects are both readable and writable streams, such as TCP Sockets, Zlib, and crypto.
It is important to note that these objects are closely related. While an HTTP response is a readable stream on the client side, it is a writable stream on the server side. This is because in the CASE of HTTP, we are basically reading data from one object (http.incomingMessage) and writing data to another object (http.serverResponse).
It is also important to note that the STdio streams (stdin, stdout, stderr) have the opposite type in the child as in the parent. This makes it very simple to read or write data from the parent’s STDIO stream in the child process.
A real life example of flow
Theories are great when they are not 100% convincing. Let’s take a look at the role streams can play in saving memory consumption with an example.
First let’s create a large file:
const fs = require('fs');
const file = fs.createWriteStream('./big.file');
for(let i=0; i<= 1e6; i++) {
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}
file.end();Copy the code
Let’s see what I used to create this big file. A writable stream!
With the FS module you can use a stream interface to read or write files. In the example above, we wrote 1 million rows of data to big.file through a writable stream.
Executing this script produces a file of about 400MB in size.
Here is a Node web server for sending big.file files:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
fs.readFile('./big.file', (err, data) => {
if (err) throw err;
res.end(data);
});
});
server.listen(8000);Copy the code
When the server receives a request, it reads the file contents through the asynchronous method fs.readfile and sends them to the client. It doesn’t look like we’re blocking the event loop. Everything looks good, doesn’t it? Isn’t it?
Let’s take a look at the real situation. We start the server, initiate connections, and monitor memory usage.
When I started the server, it took up a normal size memory space of 8.7MB:
When I connect to the server. Note the change in memory consumption:
Wow – memory consumption exploded to 434.8MB.
Before we write it to the response object, we basically load the entire contents of big.file into memory. It’s very inefficient.
The HTTP response object is also a writable stream. This means that if we have a readable stream that represents the contents of big.file, we can do the same thing by concatenating the two streams without consuming about 400MB of memory.
The createReadStream method in the Node FS module returns a readable stream for any file. We can connect it to the response object:
const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
const src = fs.createReadStream('./big.file');
src.pipe(res);
});
server.listen(8000);Copy the code
Now, when you connect to the server again, something magical happens (note the memory consumption) :
What happened?
When the client requests this large file, we send the data block by block through the stream. This means we don’t need to cache the entire contents of the file in memory. Memory consumption only increased by about 25MB.
You can take this example to the extreme. Rebuild a big. File file with 5 million lines instead of 1 million lines. It’s about 2 gigabytes in size. This exceeds the default buffer size limit for Node.
If you try to read that file from fs.readfile, it will fail by default (you can change the buffer size limit, of course). But by using fs.createreadStream, there is no problem sending a 2GB file to the client. Even better, the memory consumption of the process does not increase as the file size increases.
Are you ready to learn the flow?
This article is part of my Node.js class at Pluralsight. You can find a video version of this section here.
Stream Quick Start
There are four basic types of streams in Node.js: readable, writable, bidirectional, and transformable.
- A readable stream is an abstraction of a source that can read data.
fs.createReadStream
Method is an example of a readable stream. - A writable flow is an abstraction of a target to which data can be written.
fs.createWriteStream
Method is an example of a writable flow. - Two-way flow is both readable and writable. TCP sockets fall into this category.
- A transform stream is a special two-way stream that generates readable data based on the data being written. For example, using
zlib.createGzip
To compress the data. You can think of a transformation stream as a function whose input corresponds to a writable stream and whose output corresponds to a readable stream. You may also have heard that transformation flow is sometimes called”thought streams“.
All streams are instances of EventEmitter. They emit events that can be used to read or write data. However, we can use the PIPE method to work with the data in the stream in a simpler way.
Pipe method
Here’s the magic to remember:
readableSrc.pipe(writableDest)Copy the code
In this simple line of code, we pipe the output of a readable stream to the input of a writable stream. The upstream (source) of the pipe must be a readable stream and the downstream (destination) must be a writable stream. Of course, they can also be bidirectional/transform flows. In fact, if we pipe to a bidirectional stream, we can connect multiple streams as in Linux:
readableSrc
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(finalWrtitableDest)Copy the code
The PIPE method returns the last stream, which allows us to concatenate multiple streams. For streams A (readable), B and C (bidirectional), and D (writable). We can do this:
Amy polumbo ipe (b). Pipe (c) pipe (d) # is equivalent to: Amy polumbo ipe p. ipe (b) (c) c.p ipe (d) # in Linux, is equivalent to: $a | b | | d cCopy the code
The PIPE method is the easiest way to use streams. The general recommendation is to use either the PIPE method or an event to read the stream, and to avoid mixing the two. In general you do not need to use events with the PIPE method. But if you want to use flows in a more customized way, you need events.
Flow of events
In addition to reading data from a readable stream and writing to a writable stream, the PIPE method automatically handles some other cases for you. For example, error handling, file ending, and inconsistent read/write speeds between two streams.
However, streams can also be read directly from events. Here is a simplified example of using events to simulate pipe reading and writing data:
# readable.pipe(writable)
readable.on('data', (chunk) => {
writable.write(chunk);
});
readable.on('end', () => {
writable.end();
});Copy the code
Here are some of the events and methods used when using readable or writable streams:
The screenshot is from my Pluralsight course – Advanced Node.js
These events are related to functions because we always use them together.
The two most important events on a readable stream are:
data
Event, which is triggered any time a readable stream sends data to its consumersend
Event, which is triggered when the readable stream has no more data to send to the consumer
The two most important events on a writable flow are:
drain
Event, which is a signal that a writable stream can accept more data.finish
Event that is triggered when all data has been written to the underlying system.
Events and functions can be used together to use flows in a more customized, optimized way. For readable streams, we can use the pipe/unpipe method, or the read, unshift, resume method. For a writable stream, we can either set it downstream of the PIPE/UnPIPE method or use the write method to write data and call the end method when the write is done.
Pause and flow modes for readable streams
Readable streams have two main patterns that affect how we use them:
- It’s either in pause mode
- Or they’re in flow mode
These modes are sometimes called pull and push modes.
All readable streams are in pause mode by default. But they can switch between flow mode and pause mode on demand. This switching sometimes happens automatically.
When a readable stream is in pause mode, we can use the read() method to read data on demand. For a readable stream in flow mode, the data will flow continuously, and we need to process the data through event listening.
In flow mode, data is lost if there are no consumer listening events. This is why we need a data event callback function when dealing with flow-mode readable streams. In fact, a stream in paused mode can be switched to flow mode by adding a data event callback; Similarly, removing the data event callback cuts the stream back into pause mode. Part of the reason for doing this is to be compatible with older Node stream interfaces.
To manually switch between the two modes, you can use the resume() and pause() methods.
The screenshot is from my Pluralsight course – Advanced Node.js
When using the PIPE method, it automatically handles switching between these modes for you, so you don’t have to worry about these details.
Implementation flow interface
When we talk about flows in Node.js, we talk about two main tasks:
- One is the implementation flow.
- One is to use streams.
So far, we’ve only discussed how to use streams. Let’s see how to do it!
Implementers of streams usually require the Stream module.
Implement a writable stream
To implement a Writable stream, we need to use the Writable class from the Stream module.
const { Writable } = require('streams');Copy the code
There are many ways to implement a writable flow. For example, we can inherit the Writable class:
class myWritableStream extends Writable {}Copy the code
However, I prefer a simpler construct. We can create an object by passing configuration items directly to the Writable constructor. The only required configuration item is a write function, which exposes an interface for writing data.
const { Writable } = require('stream');
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString()); callback(); }}); process.stdin.pipe(outStream);Copy the code
The write method takes three arguments.
- Chunk is usually a buffer, unless we configure streams specifically.
- Encoding is usually ignored. Unless chunk is not configured as a buffer.
- The callback method is a callback function to execute after we have finished processing the data. It is used to indicate whether data was successfully written. If the write fails, you need to pass in an error object when executing the callback function.
In outStream, we simply receive the data as the string console.log, and pass no error object through callback to indicate success. This is a very simple and unhelpful backflow. It sends back any data it receives.
To use this stream, we can use it with the readable stream process.stdin. Simply pipe process.stdin to outStream.
When we run the code above, any characters entered into process.stdin will be output back to console.log in outStream.
This is not a very practical streaming implementation because Node already has its implementation built in. It is almost equivalent to process.stdout. By concatenating stdin with stdout, we can get exactly the same postback effect with one line of code:
process.stdin.pipe(process.stdout);Copy the code
Implement a readable stream
To implement Readable streams, we need to introduce the Readable interface and create objects with it:
const { Readable } = require('stream');
const inStream = new Readable({});Copy the code
This is a very simple readable stream implementation. We can push data downstream with a push method.
const { Readable } = require('stream');
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);Copy the code
When we push a null value, it means that there will be no further data for the stream.
To use this readable stream, we can connect it to the writable stream process.stdout.
When we execute the code above, all data read from inStream is displayed to standard output. Very simple, but not very efficient.
We pushed all the data before we connected the stream to process.stdout. A better approach would be to push data only on demand when the user requests it. We can do this by implementing the read() method in a readable stream configuration:
const inStream = new Readable({
read(size) {
// Someone wants to read data}});Copy the code
When the read method on a readable stream is called, the stream implementation can push some data to the queue. For example, we could start with character code 65 (for letter A) and push one letter at A time, incrementing the character code by one each time:
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null); }}}); inStream.currentCharCode =65
inStream.pipe(process.stdout);Copy the code
The read method keeps firing as the user reads the readable stream, and we keep pushing letters. We need to stop the loop somewhere, which is why we put in an if statement to push a null value when currentCharCode is greater than 90 (for Z).
This code is equivalent to the simple code we started with, but we’ve changed it to push data when the consumer needs it. You should always do this.
Implement bidirectional/transform flow
For bidirectional streams, we want to have both readable and writable streams on the same object. It’s like we’re inheriting two interfaces.
The following example implements a bidirectional stream that combines the previously mentioned readable and writable stream features:
const { Duplex } = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null); }}}); inoutStream.currentCharCode =65;
process.stdin.pipe(inoutStream).pipe(process.stdout);Copy the code
By combining these methods, we can read letters from A to Z through this two-way stream and also take advantage of its backsending properties. We plug the readable STDIN stream into this bidirectional stream to take advantage of its backtracking properties and plug it into the writable STdout stream to view letters A through Z.
It is important to understand that the read and write parts of a two-way flow are completely independent. It simply implements both features on the same object.
Transformation flow is a more interesting bidirectional flow because its output is based on input operations.
For a transform stream, we do not need to implement the read or write methods, but only a transform method, which combines both functions. Its function signature is consistent with the write method, and we can also push data through it.
Here’s a transform stream that converts whatever you type to uppercase letters:
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase()); callback(); }}); process.stdin.pipe(upperCaseTr).pipe(process.stdout);Copy the code
In this transform flow, we only implement the transform() method, but achieve the effect of the previous bidirectional flow example. In this method, we convert chunk to uppercase and pass it downstream using the push method.
Stream object pattern
By default, streams receive arguments of type Buffer/String. We can make the stream accept any JavaScript object by setting the objectMode parameter.
Here is a simple demo. The following combination of transform streams is used to convert a comma-separated string into a JavaScript object. Passing “a,b,c,d” becomes {a: b,c: d}.
const { Transform } = require('stream');
const commaSplitter = new Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk.toString().trim().split(', ')); callback(); }});const arrayToObject = new Transform({
readableObjectMode: true.writableObjectMode: true,
transform(chunk, encoding, callback) {
const obj = {};
for(let i=0; i < chunk.length; i+=2) {
obj[chunk[i]] = chunk[i+1];
}
this.push(obj); callback(); }});const objectToString = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.stringify(chunk) + '\n'); callback(); }}); process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout)Copy the code
We pass the commaSplitter a string (let’s say “a, B,c, D “) that outputs an array as readable data ([” A “, “b”, “c”, “D”]). Adding the readableObjectMode flag to the stream is necessary because we are pushing an object downstream, not a string.
We then pass the array of commaSplitter output to the arrayToObject stream. We need to set up the writableObjectModel so that the stream can receive an object. It also pushes an object downstream (the input data is converted to an object), which is why we also need to configure the readableObjectMode flag bit. The final objectToString stream receives an object but outputs a string, so we just need to configure writableObjectMode. All that is passed downstream is a plain string.
How to use the above example code
Node’s built-in transform stream
Node has some very useful transform streams built in. These are the Zlib and Crypto streams.
Here is a script that combines zlib.creategzip () with the fs readable/writable stream to compress the file:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));Copy the code
You can use this script to gzip files passed in any of the parameters. We read file contents through a readable stream and pass them to zlib’s built-in transform stream, then write new files through a writable stream. That’s easy.
The great thing about using pipes is that we can combine them with events if necessary. For example, I want to give the user some progress hints during script execution and display a completion message when the script is finished. Since the PIPE method returns the downstream stream, we can cascade together the operations for registering event callbacks:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write('. '))
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () = >console.log('Done'));Copy the code
So using the pipe method, we can easily use streams. When needed, we can further customize the interaction with the flow through events.
The advantage of the PIPE approach is that we can compose our programs with fragments in a more readable way. For example, instead of listening directly for data events, we can create a transformation stream to display progress. Replace the.on() call with another.pipe() call:
const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
const { Transform } = require('stream');
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write('. ');
callback(null, chunk); }}); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file +'.zz'))
.on('finish', () = >console.log('Done'));Copy the code
The reportProgress stream is a simple straight-through stream, but it also reports progress information. Notice how I pass data in the transform() method using the second argument of callback(). It is equivalent to pushing data using the push method.
The applications of composite streams are endless. For example, suppose we need to encrypt a file before or after compressing it, all we need to do is introduce a new transform stream in the right place. We can use the built-in Crypto module of Node:
**const crypto = require('crypto');
**// ...Copy the code
const crypto = require('crypto');
// ...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192'.'a_secret'))
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + '.zz'))
.on('finish', () = >console.log('Done'));Copy the code
The script above compresses and encrypts a given file so that only those who know the secret key can use the resulting file. We cannot extract the file using a normal decompression tool because it is encrypted.
To actually unzip any files compressed using the above script, we need to use Crypto and Zlib in reverse order:
fs.createReadStream(file)
.pipe(crypto.createDecipher('aes192'.'a_secret'))
.pipe(zlib.createGunzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file.slice(0.- 3)))
.on('finish', () = >console.log('Done'));Copy the code
Assuming that the file passed in is a compressed version, the above script creates a read stream for that file, connects to the createDecipher() stream of a crypto module (using the same key), and passes the output to the createGunzip() stream of a Zlib module, Finally, the resulting data is written to a file without the compressed file extension.
That’s all I have to say about this topic. Thanks for reading! See you next time!
If you find this article helpful, please go to 💚 below. Follow me for more node.js and JavaScript articles.
I make online courses for Pluralsight and Lynda. My most recent courses are getting started with React.js, advanced Node.js, and learning full stack JavaScript.
I also do online and in-person training on JavaScript, Node.js, React.js, and GraphQL from beginner to advanced. If you are looking for an instructor, please contact me. I will be conducting 6 live workshops at Foward. Js this July, including one on Node.js progression
If you have questions about this post or any of my other posts, you can find me through this Slack account and ask in the # Questions room.
The Nuggets Translation Project is a community that translates quality Internet technical articles from English sharing articles on nuggets. Android, iOS, React, front end, back end, product, design, etc. Keep an eye on the Nuggets Translation project for more quality translations.