Medium.freecodecamp.org/node-js-str…

Streams in Node.js are notoriously difficult to handle and even understand. I’ve got some good news for you – it won’t happen again.

Over the past few years, developers have created numerous packages with the sole purpose of making processing streams easier. But in this article, I’m going to focus on the native Node.js Stream.

“Flow is the best and most misunderstood concept in Node.js” – Diminic Tarr

What exactly is flow?

A stream is a collection of data – like an array or a string. The difference is that streams may not be fetched once, and they do not need to match memory. This makes streams very powerful when dealing with large volumes of data, or when retrieving one piece of data at a time from an additional source.

However, streams can handle more than just large volumes of data. They also give us the ability to combine things in code. Just as we can combine powerful Linux commands by importing other, smaller Linux commands.


const grep = ... // grep output const wc =... Grep. Pipe (wc)Copy the code

Many modules built into Node.js implement the stream interface:


Some of the native Node.js objects in the list above are both readable and writable streams. Some of them are both readable and writable streams, such as TCP Sockets, Zlib, and Crypto streams.

The object of attention is also closely related. The HTTP response is readable on the client side and writable on the server side. Because in the HTTP example, we’re basically reading data from one object (http.incomingMessage) and writing data to another object (http.serverResponse).

Note that when a child process is present, its standard input and output streams (stdin, stdout, stderr) have the opposite flow type. This allows import from the main process input/output stream into the child process input/output stream in a very convenient manner.

A stream instance

The theory is good, but it’s not 100% convincing. Let’s look at an example that shows the memory consumption of different streams in code.

Let’s start by creating a large file:

const fs = require('fs'); const file = fs.createWriteStream('./big.file'); for(let i = 0; i<=1e6; i++) { file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit. \n'); } file.end();Copy the code

What did I create this big file with? A writable stream!

The FS module can be used to read or write a file from a stream interface. In the example above, we write data to big.file in a loop through a writable stream.

Running the above script produces a file of about 400MB.

Here is a Node Web server dedicated to big.file:

const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if(err) throw err; res.end(data); })}); server.listen(8000);Copy the code

When the server gets a request link, it serves the large file asynchronously. This way it’s not like blocking anything in the event loop. It looks great, right? Isn’t it?

Well, let’s see what happens when we run the server above, simulating memory changes.

When I run the above code, it starts with a normal memory amount of 8.7MB:


Then connect to the server. Note memory consumption:


Wow – Memory consumption skyrocketed to 434.8 MB.

After we write the big file to the response object, we store the entire big.file in memory. It’s inefficient.

The HTTP response object (res in the code above) is also a writable flow. This means that if we have a readable stream that represents the contents of big.file, we can import two objects to each other, achieving the same goal without consuming around 400 MB of memory.

Node’s FS module uses the createReadStream method to give us a readable stream that we can import into the response object.

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);
Copy the code

Now connect to the server and something amazing happens (see memory consumption).


What happened?

When a client requests this big file, we stream it in one block at a time, which means we don’t have to buffer the whole big file in memory anymore. Memory usage grew to 25MB, and that was it.

You can modify this example to its critical value. Regenerating a file that is larger than 2GB is actually larger than the buffer limit in Node.

If you try to serve this large file via fs.readfile, you obviously can’t by default (you can change the limit). But you can use fs.createreadStream to stream 2GB of data into the request without any problem, and best of all, the process memory usage is basically as small.

Are you ready to learn the flow now?

Streams 101

There are four basic stream types in Node.js: Readable, Writable, Duplex, and Transform.

  • A readable stream is an abstraction of a source from which data can be consumed. An example is the fs.createreadStream method.
  • A readable stream is an abstraction in which data can be written to a target. An example is the fs.createWritestream method.
  • Two-way flow is both readable and writable. An example is the TCP socket.
  • Conversion streams are based on bidirectional streams and can be used to change or transform data while reading or writing. An example is zlib.createGzip uses the gzip algorithm to compress data. You can think of a transformation stream as a function whose input is writable and output is readable. You may have heard transformation streams referred to as “through streams”.

All streams are instances of EventEmitter. The events that trigger them can read or write data; however, we can use the PIPE method to consume the stream’s data.

Pipe method

Here’s a magical line of code to remember:

readableSrc.pipe(writableDest)
Copy the code

In this simple line of code, we import the output of the readable stream, the source data, as the input to the writable stream, the target. The source data must be a readable stream, and the target data must be a readable stream. Of course, they can both be bidirectional/transformational flows. In fact, we import data into a two-way stream, which we can do chain-import like we do in Linux:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWrtitableDest)
Copy the code

The PIPE method returns the target flow, which allows us to do a chain call to PIPE. For streams A (readable), B and C (Duplex), and D (writable), we can:

a.pipe(b).pipe(c).pipe(d)
# Which is equivalent to:
a.pipe(b)
b.pipe(c)
c.pipe(d)
# Which, in Linux, is equivalent to:
$ a | b | c | d
Copy the code

The PIPE method is the simplest way to consume a flow. Pipe methods or events are generally recommended to consume streams, but do not mix them. Usually when you use the PIPE method, you don’t need to use events, but if you learn to use streams in more custom ways, you will need to use events.

The Stream events

In addition to reading from a readable source stream and writing to a target stream, pipe mode automatically manages a few things. For example, handling errors, the end of files, or situations when one stream is faster than another or full.

However, streams can be consumed directly by events. Here is code for the PIPE method to read the data equivalent using the event consumption stream:

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});
Copy the code

Here is a list of important events and functions used by both readable and writable streams:


Events and functions are related in a way because they can be used together.

The most important events on readable streams are:

  • Data event, which is emitted when a block of data is passed to the consumer.
  • The end event, which is emitted when there is no data in the stream to consume.

The most important events on writable streams are:

  • Drain event, a flag when a writable stream can accept more data.
  • Finish event, which is triggered when all data has been written to the underlying system.

Events and functions can be used together to customize and optimize flows. To consume a readable stream, we can use the PIPE /unpipe method, or the read/unshift/resume method. To consume a writable stream, we can either target it for pipe/unpipe or use the write method to write and call the end method when we are done.

Pause and flow modes for readable streams

Readable streams have two main patterns that affect the way we consume them:

  • I could say paused mode
  • Or flowing patterns

These patterns are sometimes referred to as pull and push patterns.

All readable streams start in pause mode by default, but they can easily switch to flow mode and fall back into pause mode if needed. Sometimes, this switch happens automatically.

When a readable stream is in pause mode, we can use the read() method to read data on demand from the stream. However, for a readable stream in flow mode, the data keeps flowing and we have to listen for events to consume it.

In flow mode, if there is no accessible consumer to process it, the data may actually be lost. When there is a readable stream in flow mode, we need a data event handler. In fact, adding a Data event handler will switch a suspended stream to flow mode, and removing the data event will switch the stream back to its original paused mode. This is done to accommodate older stream interfaces.

To manually switch between the two stream modes, you can use resume() and pause() methods.


When consuming a readable stream using the PIPE method, we don’t have to worry about those patterns because pipe manages them automatically.

Implement Streams

When we talk about flows in Node.js, there are two main tasks:

  • Implement a task for a flow
  • Consuming flow task

So far we’ve only talked about consumption flows. Let’s make some happen!

The Require Stream module is often required to implement flows

Implement a writable stream

To implement a Writable stream, we need to use the Writable constructor from the stream module.

const { Writable } = require('stream');
Copy the code

We can implement a writable stream in many ways. We can, for example, extend the Writable constructor.

class myWritableStream extends Writable {
}
Copy the code

However, I prefer the simple constructor approach. We simply create an object from Writable and pass in some options. The only option required is a write function that exposes the block to which the data needs to be written.

const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }}); process.stdin.pipe(outStream);Copy the code

The write method takes three arguments.

  • If we do not configure the format of the stream, chunk is usually a buffer.
  • Encoding is required in that case, but usually we can ignore it.
  • Callback is a function that needs to be called after processing a block of data. It identifies whether the write was successful, and to indicate failure, a callback with an error object can be called.

At outStream, we can simply print chunk as a string and call callback, with no errors representing success. This is one of the simplest and least useful echo streams. It will print out whatever it receives.

To consume this stream, we can simply use it with process.stdin, which is a readable stream, so we can import process.stdin into outStream.

When we run the code above, any characters we type into process.stdin will be printed out by outStream’s console.log.

This is not a stream of useful implementations because there are already built-in implementations. Equivalent to process.stdou. We can import stdin directly into stdout and implement the same feature in just one line:

process.stdin.pipe(process.stdout);
Copy the code

Implement a readable stream

To implement a Readable stream, we can get the Readable interface and from it into an object:

const { Readable } = require('stream');
const inStream = new Readable({});
Copy the code

Here’s an easy way to implement a readable stream. We can directly push the data that consumers want to consume.

const { Readable } = require('stream'); 
const inStream = new Readable();
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');
inStream.push(null); // No more data
inStream.pipe(process.stdout);
Copy the code

When we push a null object, it means we want to indicate that there is no more data in the stream.

To consume this simple readable stream, we can simply import it into the writable stream process.stdout.

When we run the above code, we will read all the data from inStream and print it to standard output. Very simple, but not very efficient.

Before importing it into process.stdout, we just push all the data in the stream. A better way is to push on demand, which we can implement with the read method in a readable stream configuration when a consumer needs it:

const inStream = new Readable({
  read(size) {
    // there is a demand on the data... Someone wants to read it.
  }
});
Copy the code

When the read method is upped on a readable stream, the implementation (imeplementation) can push part of the data into the queue. For example, we can push one character at A time, starting with character code 65 (for A) and incrementing with each push:

const inStream = new Readable({ read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); }}}); inStream.currentCharCode = 65; inStream.pipe(process.stdout);Copy the code

When a consumer reads a readable stream, the read method keeps firing and we push more characters. We need to stop the cycle somewhere, which is why we pushed null when the condition was true (greater than 90).

This code is equivalent to the simpler one to start with, but we push in the data on demand when the consumer wants it. You should do this.

Implement bidirectional flow and conversion flow

Using bidirectional streams, we can implement both readable and writable streams within an object. It’s like inheriting two interfaces.

Here is an example of a two-way flow combining the above readable and writable examples:

const { Duplex } = require('stream'); const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); }, read(size) { this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) { this.push(null); }}}); inoutStream.currentCharCode = 65; process.stdin.pipe(inoutStream).pipe(process.stdout);Copy the code

By combining methods, we can use this bidirectional stream to read characters, as well as its print (echo) feature. We import A readable STDIN stream into A bidirectional stream using the print feature, we import A bidirectional stream into A writable STdout stream to look at characters from A to Z.

It is important to understand that two-way flows read and write independently of each other. This simply puts a group of two features on top of an object.

A transformed stream is more interesting than a bidirectional stream because its output is computed from its input.

For a transformation flow, we don’t need to implement the read or write methods, we just need to implement the transform method, which combines both. It has a write method feature and we can also use it to push data.

Here’s an example of a simple transform stream that will print any character you type in uppercase:

const { Transform } = require('stream'); const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); callback(); }}); process.stdin.pipe(upperCaseTr).pipe(process.stdout);Copy the code

In this transformation flow, we can consume it just like the two-way flow in the previous example, we just implement a transform method. In that example, we converted each chunk to uppercase format and pushed it into the readable stream as a readable part.

Stream object pattern

By default, the stream excludes Buffer/String values. There is also an objectMode identifier that we can set to accept any Javascript object.

Here’s a simple example. The following combination of transformation flows implements a feature that converts comma-separated strings into Javascript objects, so “a,b,c,d” becomes {a: b,c: d}.

const { Transform } = require('stream'); const commaSplitter = new Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(chunk.toString().trim().split(',')); callback(); }}); const arrayToObject = new Transform({ readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback) { const obj = {}; for(let i=0; i < chunk.length; i+=2) { obj[chunk[i]] = chunk[i+1]; } this.push(obj); callback(); }}); const objectToString = new Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.stringify(chunk) + '\n'); callback(); }}); process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout)Copy the code

We pass an input character (for example: “a, b, c, D “) to ([“a”, “b”, “c”, “d”]) via the commaSplitter stream. Adding readableObjectMode to the stream is necessary because it pushes an object, not a string.

When we get the array and pipe it into the arrayToObject stream. We need a writableObjectMode identifier for this stream to accept objects. It’s going to push an object. That’s why we need the readableObjectMode identity here as well. The last objectToString stream accepts an object but pushes a string, which is why only the writableObjectMode identifier is required here. The readable part is a normal string (a char object).


Node’s built-in conversion flow

Node has some very useful built-in conversion flows. Namely, zlib and Crypto streams.

Here is an example of creating a file compression script using the zlib.crreategzip () stream in combination with the FS readable/ Writable stream:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'));
Copy the code

You can use this script to gzip any files you pass in the parameters. We import a readable stream, then into the zlib built-in conversion stream, and then into the writable stream of the newly compressed file. Simple.

The cool thing about imports is that you can combine them if you need to. That is, for example, I want the user to see a progress indicator while the script is working and a “done” message when the script is finished executing. Since the PIPE method returns the target stream, we can and can chain register the event handler:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));
Copy the code

So with the PIPE method, we can easily consume streams, but we still use events to customize our interactions with those streams where needed.

The best thing about the PIPE method is that we can use it to compose our programs piece by piece in a more readable way. For example, instead of listening for data events, we could simply create a transformation flow to report progress and replace the.on() call with another pipe call:

const fs = require('fs'); const zlib = require('zlib'); const file = process.argv[2]; const { Transform } = require('stream'); const reportProgress = new Transform({ transform(chunk, encoding, callback) { process.stdout.write('.'); callback(null, chunk); }}); fs.createReadStream(file) .pipe(zlib.createGzip()) .pipe(reportProgress) .pipe(fs.createWriteStream(file + '.zz')) .on('finish', () => console.log('Done'));Copy the code

The reportProgress stream is a simple pass-Through stream, but it also reports progress to standard output. Notice how I push the data inside the transform() method in the second argument to callback(). It’s like putting data first.

The application of combined streams is not over. For example, if we need to encrypt files before or after gzip, all we need to do is pipe into another Transform stream in the order we want. We can use Node’s crypto module:

const crypto = require('crypto');
// ...
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));
Copy the code

The script above compresses and then encrypts an incoming file, and only those using the output file have the password. We cannot unzip a file in the normal way because it is encrypted.

In order for the above script to actually unzip any compressed file, we need to use the crypto and Zlib streams in reverse order. Is simple:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));
Copy the code

Assuming a compressed version of the file passed, the above code will create a readable stream, import it into the crypto createDecipher() stream (using the same password), import the output into the Zlib createGunzip() stream, and then write something to a file, There are no extended parts.

Thanks for reading! ✌, see you next time!