Node.js Streams: Everything You Need to Know

Node.js streams are generally considered by developers to be not only difficult to use, but also difficult to understand. Now the good news is that Node.js streams will no longer be unmanageable. Over the past few years, developers have developed many third-party Node.js packages to make it easier to manipulate Node.js streams. But in this article, I’ll focus on the application of node.js’s native streaming interface.

Don’t have to listen to people trying to interpret their culture.

– Dominic Tarr

What is the flow

A stream is a collection of data —- such as arrays or strings. The difference is that streams don’t have to be used all at once, and they don’t have to fit into memory. These two characteristics make streams very efficient when processing large amounts of data or returning a large chunk of data externally at once.

The combinatorial nature of stream code provides new power for streams to handle large amounts of data. Node.js implements data channel functionality in the same way that small Linux commands can be combined to create a rich combination of commands.

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input
grep.pipe(wc)
Copy the code

Many node.js built-in modules implement the stream interface:

In the API shown above, some native Node.js objects are both readable and writable streams, such as TCP Sockets,Zlib, and Crypto streams.

It is worth noting that some of the behavior of objects is closely related. For example, HTTP objects are readable streams on the client and writable streams on the server. This is because on HTTP, programs read data from one object (http.incomingMessage) and then write the read data to another object (http.serverResponse).

A practical use case about flow

Theory sounds nice, but it doesn’t quite convey the subtlety of flow. Let’s take a look at an example where you can see the difference in memory footprint between using streams or not.

Let’s create a big file first:

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();
Copy the code

In the sample code above, the FS module can read and write files through the stream interface. Write data to big.file by looping the writable stream a million times.

Execute the corresponding code to generate about 400 megabytes of files.

Here is the Node server code for manipulating this large file:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);
Copy the code

When the service receives a request, the program sends a data packet to the requester using the asynchronous fs.readfile function. Is it true that such code does not appear to block the program’s event loop?

Well, let’s see what happens when we start the service and then request the service.

When the service is started, the amount of memory consumed by the service is 8.7 megabytes.

Then request the service, noting the memory usage:

Wow —- memory footprint suddenly jumps to 434.8 MB.

Essentially, the program writes all the data into memory before writing the big data file to the HTTP response object. This kind of code is very inefficient.

The HTTP response object is also a writable stream, and if we pipe the readable stream representing big.file content with the writable stream of the corresponding HTTP object, the program can reach the same result through both streams without incurs a memory footprint of nearly 400 megabytes.

The FS module in Node.js generates a readable stream that reads files through the createReadStream method. The program can then pipe readable to the HTTP response object:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);
Copy the code

When the service is requested again, a magical thing happens (note the memory footprint) :

What happened

When a client requests a large file, the program streams the data one piece at a time, which means we don’t need to cache the data in memory. Memory usage also rose by only 25 megabytes.

We can push this test case to the limit. Re-create the big. File file with 5 million lines instead of 1 million lines, and the new file will be 2GB, which is larger than the default node.js cache.

Use fs.readfile to read large memory files. It is best not to modify the program’s default cache space. However, if you use fs.createreadStream, you won’t have a problem even requesting a 2GB data stream. The memory footprint of using the second approach as a service application hardly changes.

flow

There are four types of streams in Node.js: Readable, Writable, Duplex, and Transform.

  • Readable Stream: An abstraction of resources that can be consumed, such as the fs.createreadStream method
  • Writable Stream: An abstraction where data can be written to a destination, such as the fs.createWritestream method
  • Duplex Stream: a Stream that is both readable and writable, such as the TCP socket
  • Transform Stream: Based on duplex Stream, read or write data is modified or transformed. For example, the zlib.createGzip function uses the gzip method to compress data. We can think of the input of a transformation flow as writable and the output as readable. This is known as a “pass-through flow” transformation flow.

All streams are instances of the EventEmitter module that trigger events for both readable and writable data. However, programs can consume stream data using the PIPE function.

The pipe function

Here’s a magic code to remember:

readableSrc.pipe(writableDest)

In this simple code, the output of the readable stream (the data source) is piped as the input (the target) of the writable stream. The source data must be a readable stream, and the destination must be a writable stream. They can also be duplex flows or conversion flows at the same time. In fact, if the developer passed duplex into the pipe, we could link to pipe calls like Linux does:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)
Copy the code

The pipe function returns the target flow, which allows the program to make the chain call above. The following code: Streams A are readable, streams B and C are duplex, and streams C are writable.

a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d
Copy the code

The Pipe method is the simplest way to implement streaming consumption. Pipes or event consumption streams are generally recommended, but avoid mixing them. Usually when you use the pipe function, you do not need to use events. But if the application needs to customize the consumption of the stream, events can be a good choice.

Flow of events

In addition to reading the readable stream source and writing the read data to the writable destination. Pipes also manage things automatically. For example, it can handle exceptions and end a file when one stream is faster or slower than another.

However, streams can be consumed directly through events. The following is an equivalent of the Pipe method, which reads or writes data through simplified, event-like code.

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});
Copy the code

There are a series of events or functions available for readable and writable flows.

These events or functions are usually related in some way. The events for readable streams are:

  • Data event, which is triggered when it is passed to the consumer
  • End event, fired when no data is consumed in the stream

Important events for writable streams are: the -drain event, which signals when writable streams accept data, the -Finish event, which is triggered when all data has been flushed to the bottom of the system

Events and functions can be combined together to customize the flow or optimize the flow. To consume a readable stream, a program can use the PIPE /unpipe method or the read/unshift/resume method. To consume the writable stream, the program can either use it as the destination for PIPE/UnPIPE or write data through the write method and call the end method when the write is complete.

Paused and Flowing modes in readable flows

There are two patterns in readable streams that affect the use of readable streams by programs:

  • Readable is either in paused mode
  • Or it was in flowing mode

These patterns in turn are known as pull and push patterns.

All readable streams start in pause mode by default and switch to flow or pause mode when required by the program. Sometimes the conversion is spontaneous.

When a readable stream is in paused mode, we can use the read method to read the stream data on demand. However, for a readable stream in flowing mode, we had to consume the data by listening to events.

Under flowing (FLOWING) mode, data could be lost if it was not consumed. This is why data events are needed to process data when there is a flowing readable stream in the program. In fact, you only need to add a Data event to convert a stream from pause to flow mode and to unbind the program from the event listener and convert the stream from flow to pause mode. Some of these are backwards compatible with older Node stream interfaces.

Developers can manually switch between the two streams using resume and pause methods.

When an application consumes a readable stream using the pipe method, the developer does not need to worry about the flow mode transition, because the pipe is automatically implemented.

# # implementation flow

When we stream in Node.js, we have two different tasks:

  • Inheriting the tasks of the stream
  • Consumption flow tasks

So far, we’ve only been talking about consumption flows. Let’s implement some examples!

Implementing flow requires us to introduce flow modules into our programs

Implement writable streams

Developers can implement Writeable streams using the Writeable constructor in the stream module.

const { Writable } = require('stream');

There are many ways for developers to implement writable flows. For example, by inheriting the Writable constructor

class myWritableStream extends Writable {
}
Copy the code

However, I prefer to use the constructor implementation. Create objects only through the Writable interface and pass some options: one of the required function options is the write function, which passes in a block of data to be written.

const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }}); process.stdin.pipe(outStream);Copy the code

The write function takes three arguments:

  • Chunk is usually an array of buffers, unless the developer has customized the configuration
  • The Encoding parameter is required in test cases, but can usually be ignored by developers
  • Callback is a callback function called by the developer after the application processes a chunk of data. Usually a sign of the success of a write operation. If it is a signal to write an exception, call the callback function where the exception occurred.

In the outStream class, the program simply prints the data as a string and calls the callback if no exception occurs to signal successful execution. This is a simple but not particularly efficient echo stream, and the program will output any input data.

To use this stream, we can use it with process.stdin, which is a readable stream that passes process.stdin to outStream.

When the program executes, any data entered through process.stdin is printed out by the console.log function in outStream.

However, this functionality can be implemented through the node.js built-in module, so this is not a very useful flow. It is very similar to the functionality of process.stdout, and we can achieve the same functionality using the following code:

process.stdin.pipe(process.stdout);

Implement readable streams

To implement a Readable stream, developers need to introduce the Readable interface and build objects through it:

const { Readable } = require('stream');
const inStream = new Readable({});
Copy the code

This is the simplest way to implement a readable stream, where developers can push data directly for consumption.

const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
Copy the code

When an empty object is pushed in an application, it means that there is no more data available for the readable stream.

A developer can pipe a readable stream to process.stdout for consumption.

By executing this code, the program reads the data from the readable stream and prints it out. Very simple, but not very efficient.

The essence of the previous code is to push data to the stream and then pipe the stream to process.stdout for consumption. In fact, the application can push data on demand when consumers request a stream, which is more efficient than the previous method. By implementing the read function in the readable stream:

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});
Copy the code

By calling the read function from the Readable stream, the program can transfer some of the data to the queue. For example, if A letter is pushed to the queue each time, the serial number of the letter starts from 65 (representing A) and increases by 1 each time:

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if(this.currentCharCode > 90) { this.push(null); }}});inStream.currentCharCode = 65;
inStream.pipe(process.stdout);
Copy the code

When the consumer is consuming the readable stream, the read function is activated and the application pushes more letters. Terminates the loop by pushing an empty object to the queue. In the code above, the loop is terminated when the alphabetic number exceeds 90.

The functionality of this code is equivalent to the previously implemented code, but the application can push data on demand more efficiently when the consumer wants to read the stream. This implementation is therefore recommended.

Realize duplex, conversion flow

Duplex stream: Implement readable and writable streams on the same object, as if the object inherits two readable and writable interfaces.

Here is a duplex stream, which combines the readable and writable stream examples already implemented above:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },

  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if(this.currentCharCode > 90) { this.push(null); }}}); inoutStream.currentCharCode = 65; process.stdin.pipe(inoutStream).pipe(process.stdout);Copy the code

With objects that implement A duplex stream, A program can read a-z letters and print them out in order. The developer transfers the STDIN readable stream to the duplex stream, and then transfers the duplex stream to the STdout writable stream, printing out the a-Z letters.

In a duplex stream, the readable stream and writable stream are completely independent. A duplex stream is just an object that has both readable and writable stream functions. This is crucial to understand.

The conversion stream is more interesting than the duplex stream because its results are computed from the input stream.

For a duplex stream, there is no need to implement the read and write functions. The developer only needs to implement the transform function, because the transform function already implements the read and write functions.

After converting the input letter to uppercase, then passing the converted data to a writable stream:

const { Transform } = require('stream'); const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); }}); process.stdin.pipe(upperCaseTr).pipe(process.stdout);Copy the code

In this example, the developer is able to double stream up just by using the transform function. In the transform function, the program converts data to uppercase and pushes it into a writable stream.

The object mode of the flow

By default, streams only accept Buffer and String data. However, the developer can make the stream accept any Javascript data by setting the objectMode identifier.

The following example can prove this point. A comma-separated string is converted to a Javscript object by a set of streams, so “A,b,c,d” is converted to {a: b,c: d}.

const { Transform } = require('stream');
const commaSplitter = new Transform({
  readableObjectMode: true,
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(', ')); callback(); }}); const arrayToObject = new Transform({ readableObjectMode:true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) {
    const obj = {};
    for(leti=0; i < chunk.length; i+=2) { obj[chunk[i]] = chunk[i+1]; } this.push(obj); callback(); }}); const objectToString = new Transform({ writableObjectMode:true,
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n'); callback(); }}); process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout)Copy the code

CommaSplitter Conversion flow converts the input string (e.g., “a, b, c, D”) into an array ([” A “, “b”, “c”, “D”]). Set the writeObjectMode flag because the data pushed in the transform function is an object, not a string.

The readable stream of commaSplitter output is then transferred to the transformation stream arrayToObject. Since you are receiving objects, you also need to set the writableObjectMode identifier in arrayToObject. Because you need to push objects in your program (converting an array passed in to an object), this is why you set the readableObjectMode identifier in your program. Finally, the transformation stream objectToString receives an object but outputs a string. This is why only the writableObjectModel identity is set in the program. The output readable stream is a normal string (serialized array).

Node’s built-in conversion flow

Node has many built-in conversion streams, such as lib and crypto streams.

The following code implements compressed files using the zlib.creategzip () stream combined with the readable and writable streams of the FS module:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));
Copy the code

The program transfers the read stream of the file to zlib, Node’s built-in conversion stream, and finally to the writable stream that creates the compressed file. Therefore, as long as the developer will need to compress the file path as a parameter passed into the program, you can achieve any file compression.

Another reason for choosing pipe functions is that developers can use them in conjunction with events. For example, the developer tells the program to print out a marker to show that the script is executing and to print a “Done” message when the script is finished. The PIPE function returns the target stream, and the program can register the chain of events after retrieving the target stream:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('. '))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));
Copy the code

The PIPE function makes it easy for developers to manipulate streams and even do some custom interactions with the target stream after pipe processing through events if needed.

The power of pipe functions is to combine multiple pipe functions in an easy-to-understand way. For example: Unlike the previous example, the developer can pass in a transformation stream to indicate that the script is executing.

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('. '); callback(null, chunk); }}); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file +'.zz'))
  .on('finish', () => console.log('Done'));
Copy the code

ReportProgress is simply a transformation flow in which the script is being executed is identified. Note that the code uses the callback function to push the data in the transform. This is equivalent to the functionality of this.push() in the previous example.

There are many more application scenarios for composite streams. For example, the developer should encrypt the file first and then compress it or compress it first and then encrypt it. To do this, the program simply passes the files into the stream in order, using the Crypto module:

const crypto = require('crypto');
// ...
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192'.'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));
Copy the code

The above code achieves compression and encryption of the file, and only users who know the password can use the encrypted file. Because the developer cannot decompress the encrypted compressed file according to the ordinary decompression tool.

For any file compressed by the above code, the developer simply needs to use the crypto and Zlib streams in reverse order, as follows:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192'.'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));
Copy the code

Assuming the file is compressed, the above program first generates a readable stream to crypto’s createDecipher() stream, then transfers the output stream file to zlib’s createGunzip() stream, and finally writes it to the file.

The above is my summary of this topic. Thank you for reading and I look forward to seeing you next time.