Flows in Node.js are notoriously hard to use and even harder to understand.

In the words of Dominic Tarr: “Stream is the most useful and least understood existence in Node. Even Dan Abramov, the founder of Redux and a member of the core team behind React.js, is afraid of Node Stream.

This article will help you understand Streams and how to use them. So, don’t be afraid. follow me ! 🏖

What is Streams?

Stream is one of the basic concepts that support Node.js applications. They are data processing methods used to read and write data inputs and outputs in turn.

Stream is an efficient way to handle reading/writing files, network communication, or any type of end-to-end information exchange.

Stream is unique in that it does not read files into memory at once, as traditional programs do. Instead, it reads blocks of data piece by piece, processing their contents without keeping them all in memory.

This makes stream very powerful when dealing with large amounts of data. For example, the file size may be larger than the free memory space, making it impossible to read the entire file into memory for processing. This is where Stream comes to the rescue!

Use stream to handle smaller blocks of data so that it can read an overall larger file.

Let’s take “streaming” services like Youtube or Netflix: they don’t let you download entire videos and audio at once. Instead, your browser receives a continuous stream of video clips that the recipient immediately starts watching.

Stream, however, doesn’t just deal with media or big data. They also provide “composability” capabilities for our code. Design with composability in mind means that several components can be combined in some way to produce the same type of result. In Node.js, you can use Stream to transfer data into smaller snippets that can be freely carried and combined to write powerful snippets.

The advantages of the stream

Stream provides two major advantages over other methods of processing data:

  1. Memory efficiency: You don’t need to load a large amount (or entire) of data into memory before you process it
  2. Time efficiency: As soon as you have the data, you can start processing, which greatly reduces the time it takes to start processing the data instead of waiting until the entire data is loaded.

Four streams in Node.js

  1. Writable: writable data stream. For example,fs.createWriteStream()Allows us to use streams to write data to files.
  2. Readable: readable data stream. For example,fs.createReadStream()Allows us to read the contents of the file.
  3. Duplex: duplex flow that is both readable and writable. For example,net.Socket.
  4. Transform: A Transform stream that can modify or Transform data while reading or writing data. For example, in the case of file compression (a compressed file), you can read the decompressed data from the file while writing to the compressed data.

If you’ve ever used NodeJS, you’ve probably come across streams. For example, in a Nodejs-based HTTP server, Request is a readable stream and Response is a writable stream. You may already be using the FS module, which allows you to handle both readable and writable file streams. When you use Express, you are using streams to interact with clients, and you have application streams in every database connection driver you use, because TCP sockets, TLS stacks and other connections are based on NodeJS.

Example

1. Create a readable Stream

We will first introduce Readable and then initialize it.

const Stream = require('stream')
const readableStream = new Stream.Readable()
Copy the code

Now that the stream has been initialized, we can send data to it:

readableStream.push('ping! ')
readableStream.push('pong! ')
readableStream.push(null)
Copy the code

Async iterator

It is highly recommended to use async iterators when working with streams. According to Dr. Axel Rauschmayer, asynchronous iteration is a protocol for asynchronously retrieving the contents of a data container (meaning that the current “task” may be paused before the item is retrieved). Also, it is worth mentioning that the stream Async iterator implementation uses internal “readable” events.

You can use async iterator to read readable streams:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk); Stream Creator controls the frequency at which data events are triggered, as described below}}const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test! \n'
Copy the code

It can also collect the contents of a readable stream into a string:

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = ' ';
  for await (const chunk of readable) {
    result += chunk; Stream Creator controls the frequency at which data events are triggered, as described below
  }
  return result;
}

const readable = Readable.from('Good morning! ', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning! ');
Copy the code

Remember not to mix async functions with EventEmitter, because currently there is no way to catch reject when it emits in an event handler, making it difficult to track errors and memory leaks. The current best practice is to always wrap the contents of an asynchronous function in a try/catch block and handle errors, but this is error-prone. The pull Request is designed to address this problem and is probably incorporated into the core node code.

For more on asynchronous traversal parsing of Node Stream, read [this article])(2ality.com/2019/11/nod…)

Readable.from(): Create a Readable stream in an iteration

Stream.readable. from(iterable, [options]) is a common method for creating a Readable stream from an iterable that holds the data contained in iterable. Iterable can be synchronous or asynchronous. Parameter options are optional and can be used to specify text encoding.

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data'.(chunk) = > {
  console.log(chunk);
});
Copy the code

Two read modes

According to the Streams API, readable Streams effectively operate in two modes: flowing and Paused. A readable stream can be in object mode or non-object mode, regardless of whether it is in stream mode or pause mode.

  • The flowing Mode, data was automatically read from the bottom layer of the system and provided to the application as quickly as possible through EventEmitter interface events.

  • The paused mode, stream.read() method must display the call to read a block of data from the stream.

In flowing mode, to read data from the stream, you could listen for data events and append callbacks. When the block becomes available, the readable stream sends an end event and performs a callback. The following is an example:

var fs = require("fs");
var data = ' ';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Handle stream events --> data, end, and error
readerStream.on('data'.function(chunk) {
  data += chunk;
});

readerStream.on('end'.function() {
  console.log(data);
});

readerStream.on('error'.function(err) {
  console.log(err.stack);
});

console.log("Program Ended");
Copy the code

Fs.createreadstream () creates a readable stream. At first, the stream is at rest. ** Once the data event is listened for and a callback attached, it starts flowing. ** After that, the data block is read and passed to your callback. The stream implementer determines how often data events fire. For example, an HTTP request might trigger a data event every time a few kilobytes of data are read. When you read data from a file, you can decide to fire a data event after reading a line.

The stream fires an end event when there is no more data to read (to end). In the code snippet above, we listen for this event to be notified when an endpoint is reached.

In addition, if there is an error, the flow raises an error event.

In Paused mode, you simply call read() repeatedly on the stream instance until each chunk has been read, as in the following example:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = ' ';
var chunk;

readableStream.on('readable'.function() {
  while((chunk=readableStream.read()) ! =null) { data += chunk; }}); readableStream.on('end'.function() {
  console.log(data)
});
Copy the code

The read function reads some data from an internal buffer and returns it. It returns NULL when there is nothing to read. In the while loop, we check for null and terminate the loop. Note that a readable event will be emitted when blocks of data can be read from the stream.


Switch to Breathed mode

All readable streams can start in Paused mode and switch to flowing mode in either of the following ways:

  • Add ‘data’ event.
  • callstream.resume()Methods.
  • callstream.pipe()Method to send data to a writable stream.

A readable stream can also be switched back to Paused mode in one of the following ways:

  • If there is no write destination for pipe, callstream.pause()Methods.
  • If there are write targets for PIPE, delete all write targets for pipe. You can do this by callingstream.unpipe()Method to delete multiple PIPE targets.

The important concept to keep in mind is that a readable stream will only generate data if a mechanism (event) is provided to consume or ignore it. If the consumption mechanism is disabled or cancelled, the readable stream will attempt to stop generating data. A readable stream can be paused, resumed, and the data can be read progressively through readable.read(). If the “readable” event handler is removed, the flow will start flowing again if a “data” event handler exists.

2. Create a Writable Stream

To write data to a writable stream, call write() on the stream instance. The following is an example:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data'.function(chunk) {
  writableStream.write(chunk);
});
Copy the code

The code above is simple. It simply reads blocks of data from the input stream and writes the data to the destination using write(). This function returns a Boolean value indicating whether the operation was successful. If true, the write succeeded and you can continue writing more data. If false is returned, an error occurred and you cannot write anything. Writable streams let you know when you can continue writing data by firing the drain event.

Calling the writable.end() method indicates that no more data will be written to the writable stream. When called, its optional callback function fires as a Finish event.

// Write 'hello, ' and then end with 'world! '.
const fs = require('fs');
const file = fs.createWriteStream('haha.md');
file.write('hello, ');
file.end('world! ');
// Writing more now is not allowed!
file.on('finish'.function () {
  console.log('finished')})Copy the code

With writable streams, you can read data from a readable stream:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) = > {
  console.log(chunk.toString())
  next()
}

writableStream.on('finish'.function () {
  console.log('finished')
  writableStream.end()
})

readableStream.pipe(writableStream)

readableStream.push('ping! ')
readableStream.push('pong! ')
readableStream.push(null)
Copy the code

(Recommended) You can also use asynchronous iterators to write writable streams:

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if(! writable.write(chunk)) {// (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One'.' line of text.\n'].'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');
Copy the code

Finish () is callback-based by default, but can be converted to A promise-based version via util.promisify() (line A).

In this case, it uses the following two patterns:

While handling a write error, wait for the drain event (writable) to continue writing to the writable stream (line B) :

if(! writable.write(chunk)) {await once(writable, 'drain');
}
Copy the code

When the write is complete, close the writable stream (line C) :

writable.end();
await finished(writable);
Copy the code

pipeline()

A pipeline is a mechanism whereby we take the output of one stream as the input of another. It is typically used to get data from one stream and pass the output of that stream to another. There are no restrictions on pipe operation. In other words, pipes are used to process stream data in multiple steps.

⚠ ️ attention! A pipe or pipeline can form a pipe chain only if the target flow is a Duplex or Transform flow. The document

Stream.pipeline () was introduced in Node 10.x. This is a modular method used to pipe between streams, forward errors, clean them up properly, and provide a callback when the pipe is complete.

Here is an example using pipeline:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) = > {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded'); }});Copy the code

Pipelines should replace pipes because pipes are not secure.

Stream Module

The NodeJS Stream module provides the basis for building all the stream apis.

The Stream module is a native module provided by default in Node.js. Stream is an instance of the EventEmitter class, which processes events asynchronously in Node. Thus, a stream is event-based in nature.

Get the Stream module:

const stream = require('stream');
Copy the code

The Stream module is useful for creating stream instances of new types. There is usually no need to use the Stream module to create a stream, and there are other ways to do it that are more common.

Streams APIs

Due to its advantages, many Node.js core modules provide local streaming capabilities, most notably:

  • net.Socket is the main node api that is stream are based on, which underlies most of the following APIs
  • process.stdin returns a stream connected to stdin
  • process.stdout returns a stream connected to stdout
  • process.stderr returns a stream connected to stderr
  • fs.createReadStream() creates a readable stream to a file
  • fs.createWriteStream() creates a writable stream to a file
  • net.connect() initiates a stream-based connection
  • http.request() returns an instance of the http.ClientRequest class, which is a writable stream
  • zlib.createGzip() compress data using gzip (a compression algorithm) into a stream
  • zlib.createGunzip() decompress a gzip stream.
  • zlib.createDeflate() compress data using deflate (a compression algorithm) into a stream
  • zlib.createInflate() decompress a deflate stream

Streams listing

Here are some important events related to writable streams:

  • error– Triggering indicates that an error occurred while writing/PIPE.
  • pipeline– This event is emitted by the writable stream when the readable stream is piped to the writable stream through the PIPE.
  • unpipeEmitted when you call unPIPE on a readable stream and prevent it from pipe to the destination stream.

conclusion

This is all about the basics of Stream. Streams, pieps, and chaining are the core and most powerful features of Node.js. Streams can certainly help you write clean and high-performance code to perform I/O.

There is also a notable Node.js strategic initiative called BOB that aims to improve the Node.js Stream data interface, both within the Node.js core and hopefully as a future public API.

reference

  1. Understanding Streams in Node.js

Excellent article on stream

  1. A Visual Guide to NodeJS Streams