Developers who have worked with Node.js probably know that the concept of a Stream can be difficult to understand and deal with.

This article will help you understand the concept of flow and how to use it. Don’t worry. We’ll figure it out.

What is a Stream, “Stream”?

Streams are one of the basic concepts that drive Node.js applications. It is a data processing method used to read and write input to output in sequence.

Streams are an efficient way to handle reading and writing files, network communication, or any end-to-end exchange of information.

Streams are unique in that instead of reading a file into memory one at a time, as traditional programs do, they read the data block by block and process its contents, rather than keeping it all in memory.

This makes streams very powerful when dealing with large amounts of data, for example, the file may be larger than your free memory and it’s impossible to read the entire file into memory to process it.

Take streaming services like YouTube or Netflix: Instead of letting you download complete video and audio immediately, the browser treats the video as a continuous stream of data that users can watch immediately.

However, streaming is not just about handling media or big data, it also gives code “composability.” Designing with composability in mind means that several components can be combined in a certain way to produce the same type of result. In Node.js, powerful code segments can be formed by using streams to import or export data from other, smaller code segments.

Why flow

Streams have two major advantages over other data processing methods:

  1. Memory efficiency: Processing can be done without loading large amounts of data into memory
  2. Time efficiency: Start processing data as soon as it is available, rather than waiting for all data to be transferred

Four types of Streams in Node.js

  1. Can write flow:A stream to which data can be written. For example,fs.createWriteStream()You can use streams to write data to files.
  2. A readable stream:A stream that can read data. For example,fs.createReadStream()Content can be read from a file.
  3. Duplex flow:A stream that is both readable and writable. For example,net.Socket.
  4. Conversion stream: A stream that can modify or transform data as it is written and read. For example, in a file compression operation, you can write compressed data to a file and read decompressed data from a file.

If you’ve ever used Node.js, you’ve probably already encountered streams. For example, in a Node.js-based HTTP server, request is a readable stream and Response is a writable stream. There is also the FS module, which can handle both readable and writable file streams. Whenever you use Express, you are using streams to interact with the client, and streams are used in various database connection drivers because TCP sockets, TLS stacks, and other connections are based on Node.js streams.

How do I create a readable stream

Import the module and initialize it:

const Stream = require('stream')
const readableStream = new Stream.Readable()

Copy the code

After initialization, we can send data to it:

readableStream.push('ping! ')
readableStream.push('pong! ')

Copy the code

Async iterator

It is highly recommended to use asynchronous iterators when working with streams. Asynchronous iteration is a protocol for asynchronously retrieving the contents of a data container, meaning that the current “task” may pause before retrieving the data item. In addition, it is worth mentioning that the internal implementation of asynchronous iterators for streams uses readable events.

When reading data from a readable stream, async iterator can be used:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test! \n'

Copy the code

It is also possible to collect the contents of a readable stream in a string:

import { Readable } from 'stream';

async function readableToString2(readable) {
  let result = ' ';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning! ', { encoding: 'utf8' });
assert.equal(await readableToString2(readable), 'Good morning! ');

Copy the code

Notice that in this case, we have to use an asynchronous function because we want to return a Promise.

Don’t confuse asynchronous functions with EventEmitter because there is currently no way to capture a rejection emitted from an event handler, making it difficult to track bugs and memory leaks. The current best practice is to always encapsulate the contents of an asynchronous function in a try/catch block and handle errors, but this is error-prone. The pull Request is designed to solve this problem if it can be added to the Node core code.

Readable.from(): Creates a Readable stream from iterables

Stream.readable. from(iterable, [options]) is a practical method for creating a Readable stream from an iterable that contains data. Iterable can be synchronous or asynchronous. Options are optional and can be used to specify text encoding.

const { Readable } = require('stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
});

Copy the code

Two read modes

According to the Streams API, a readable stream operates in two modes: flowing and Paused. Readable streams can be in object mode or non-object mode regardless of whether the stream is in stream mode or paused mode.

  • In flowing mode, data was automatically read from the underlying system and provided to the application using events as quickly as possible through the EventEmitter interface.

  • In paused mode, the stream.read() method must be explicitly called to read a data block from the stream.

In flowing mode, to read data from the stream, you could listen for data events and bind callbacks. When a block of data is available, the readable stream emits a data event and performs a callback. The code is as follows:

var fs = require("fs");
var data = ' ';

var readerStream = fs.createReadStream('file.txt'); //Create a readable stream

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8\. // Handle stream events --> data, end, and error readerstream.on ('data'.function(chunk) {
   data += chunk;
});

readerStream.on('end'.function() {
   console.log(data);
});

readerStream.on('error'.function(err) {
   console.log(err.stack);
});

console.log("Program Ended");

Copy the code

The function call fs.createreadStream () provides a readable stream. At first, the flow is at rest. As soon as you listen for the data event and bind the callback, it starts flowing. The data block is then read and passed to the callback. The implementer of the flow can determine how often data events are emitted. For example, an HTTP request can emit a data event every time a few kilobytes of data are read. When you read data from a file, you might issue a Data event for every line read.

The end event is emitted by the stream when there is no more data to read (to tail). In the code above, we listen for this event to be notified when it ends.

Also, if an error occurs, the flow emits an error and notifies it.

In Paused mode, you simply call read() on the stream instance repeatedly until every piece of data has been read, as shown below:

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = ' ';
var chunk;

readableStream.on('readable'.function() {
    while ((chunk=readableStream.read()) != null) {
        data += chunk;
    }
});

readableStream.on('end'.function() {
    console.log(data)
});
Copy the code

The read() function reads some data from the internal buffer and returns it. It returns NULL when there is nothing to read. Therefore, in the while loop, we check for null and terminate the loop. Note that readable events are emitted when blocks of data can be read from the stream.


All Readable data streams start in Paused mode, but can be switched to flowing mode by:

  • adddataEvent handler
  • callstream.resume()methods
  • callstream.pipe()Method to send data to aWritable

Readable can switch back to Paused mode in one of the following ways:

  • Call if there is no pipe targetstream.pause()methods
  • If there are pipe targets, delete all pipe targets. You can do this by callingstream.unpipe()Method to delete multiple pipe targets.

The important concept to keep in mind is that Readable will not generate data unless you provide a mechanism for consuming or ignoring that data. If the consumption mechanism is disabled or disabled, Readable will attempt to stop generating data. Adding a readable event handler will automatically stop the flow and consume the data through readable.read(). If the Readable event handler is removed, the flow will start flowing again if a Data event handler exists.

How do I create writable streams

To write data to a writable stream, you need to call write() on the stream instance. As follows:

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data'.function(chunk) {
    writableStream.write(chunk);
});

Copy the code

The code above is straightforward. It simply reads blocks of data from the input stream and writes to the destination using write(). This function returns a Boolean value indicating whether the operation was successful. If true, the write is successful and you can continue writing more data. If false is returned, there is a problem and nothing can be written at this time. Writable streams notify you when you can start writing more data by issuing the drain event.

Calling the writable.end() method indicates that no more data will be written to writable. If an optional callback function is provided, it will be used as a listener function for the Finish event.

/ / write'hello, 'And then to'world! 'Const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world! '); // No more writing allowed!Copy the code

With writable streams, you can read data from a readable stream:

const Stream = require('stream')

const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

readableStream.pipe(writableStream)

readableStream.push('ping! ')
readableStream.push('pong! ')

writableStream.end()

Copy the code

You can also write to writable streams using asynchronous iterators, which is also recommended:

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if(! Writable.write (chunk)) {// (B) // handle await once(writable,'drain'); } } writable.end(); // (C) // wait for completion and throw await Finished (writable) if there is an error; } await writeIterableToFile( ['One'.' line of text.\n'].'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

Copy the code

The default version of stream.finished() is callback-based, but can be converted to A promise-based version (line A) via util.promisify().

In this example, the following two patterns are used:

Write to writable stream while handling backpressure (short peak load causes the system to receive data at a rate much higher than it can process it) (line B) :

if(! writable.write(chunk)) { await once(writable,'drain');
}

Copy the code

Close the writable stream and wait for the write to complete (line C) :

writable.end();
await finished(writable);

Copy the code

pipeline()

A pipe is a mechanism for taking the output of one stream as the input of another stream. It is typically used to take data from one stream and pass the output of that stream to another. There are no restrictions on pipe operations; in other words, pipes are used to process stream data step by step.

Node 10.x introduces stream.pipeline(). This is a modular approach for pipelining between streams, forwarding error messages and data cleansing, and providing callbacks when the pipeline is complete.

Here is an example using pipeline:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib'); Fs.createreadstream (fs.createreadstream (fs.createreadStream))'The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded'); }});Copy the code

Pipelines should be used instead of pipes because pipes are unsafe.

The Stream module

The Node.js Stream module is the basis on which all stream apis are built.

The Stream module is a built-in module provided by default in Node.js. Stream is an instance of the EventEmitter class, which is used in Node to process events asynchronously. Thus, flows are event-based in nature.

To use the Stream module, simply:

const stream = require('stream');

Copy the code

The STREAM module is useful for creating new stream instances. It is not usually necessary to use the Stream module to consume streams.

The flow-based Node.js API

Due to their advantages, many of node.js’s core modules provide native streaming capabilities, most notably these:

  • net.SocketThe main flow-based Node API is the basis for most of the following apis
  • process.stdinReturns the stream connected to stdin
  • process.stdoutReturns the stream connected to STdout
  • process.stderrReturns the stream connected to stderr
  • fs.createReadStream()Create a file-readable stream
  • fs.createWriteStream()Create a file writable stream
  • net.connect()Initialize a flow-based connection
  • http.request()returnhttp.ClientRequestClass, is a writable stream
  • zlib.createGzip()Compress the data into the stream using GZIP (a compression algorithm)
  • zlib.createGunzip()Uncompress gzip stream
  • zlib.createDeflate()Use Deflate (a compression algorithm) to compress data into a stream
  • zlib.createInflate()Unpack the deflate flow

Streams cheat sheet

type function
Readable Data provider
Writable Data receiver
Transform Providers and receivers
Duplex Provider and receiver (independent)

See documentation for more: Stream (nodejs.org)

Streams

const Readable = require('stream').Readable
const Writable = require('stream').Writable
const Transform = require('stream').Transform

Copy the code

Pipeline Piping

Clock () // readable stream.pipe (xformer()) // convert stream.pipe (renderer()) // writable streamCopy the code

methods

stream.push(/*... */) // Emit a chunk stream.emit('error', error)  // Raise an error
stream.push(null)            // Close a stream

Copy the code

The event

const st = source(a) / / assumptionssource() is a readable stream st.on('data', (data) => { console.log('< -, data) })
st.on('error', (err) => { console.log('! ', err.message) })
st.on('close', () => { console.log('** bye') })
st.on('finish', () => { console.log('** bye')})Copy the code

Flowing mode

// Enable and deactivate flowing Mode St. Resume () st.pause() // Automatically enable flowing Mode St. on('data'/ *... * /)Copy the code

A readable stream

function clock () {
  const stream = new Readable({
    objectMode: true.read() {} // self-implementread() method, if to read on demand})setInterval(() => {
    stream.push({ time: new Date() })
  }, 1000)

  return stream
}
Copy the code

Readable streams are data generators that write data using stream.push().

Transformation flows

function xformer () {
  let count = 0

  return new Transform({
    objectMode: true,
    transform: (data, _, done) = > {done(null, { ... data, index: count++ }) } }) }Copy the code

Transfer the converted data block to Done (NULL, chunk).

Streams can be written

function renderer () {
  return new Writable({
    objectMode: true,
    write: (data, _, done) => {
      console.log('< -, data)
      done()}})}Copy the code

String it all together

Clock () // readable stream.pipe (xformer()) // convert stream.pipe (renderer()) // writable streamCopy the code

Here are some important events related to writable streams:

  • error– Sent when a write/pipe operation has gone wrong
  • pipeline– The writable stream emits this event when a readable stream is passed to a writable stream.
  • unpipe— When you call on a readable streamunpipeAnd stop sending it to the target stream.

conclusion

That’s all you need to know about flows. Flow, pipe, and chain operations are the core and most powerful features of Node.js. Streams can really help you write clean and efficient code to manipulate I/O.

In addition, there is a much-anticipated Node.js strategic initiative called BOB, which aims to improve The Node.js streaming data interface, both for use in The internal Core of Node.js and, hopefully, for use in the public API in the future.

For more technical dry goods, please pay attention to wechat public number: 1024 translation station