What is a Stream

A stream is an abstract data interface implemented by many objects in Node.js. A stream is an instance of EventEmitter objects, which emit data (measured in buffers) or absorb data

The type of flow

Node.js, Stream there are four types of Stream:

  • Writable: A stream that can write data (for example, fs.createWritestream ()).
  • Readable: A stream from which data can be read (for example, fs.createreadStream ()).
  • DuplexReadable 和 WritableFor example,net.Socket).
  • Transform: Data that can be modified or converted while being written and readDuplexFlow (e.g.,zlib.createDeflate()).

The role of the Stream

Let’s start with an example:

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(3000);
Copy the code

While this code works, there is a significant problem — for each client request, the fs.readfile interface reads the entire data.txt file into memory and then returns the result to the client. If you think about it, if the data.txt file is very large, your application may consume a lot of memory while responding to concurrent requests from a large number of users, which may result in slow connection problems for users.

Second, the above code can create a poor user experience because the user needs to wait for the file’s contents to be fully read into memory before receiving anything.

Here, however, is where Stream comes in:

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server2.listen(4000);
Copy the code

In the above code section, fs.createreadStream creates the data.txt Readable Stream * (Readable Stream). For the created stream, we listen for its data and end events through the.pipe() interface, and break chunks of data.txt into chunks to stream to the client. Instead of waiting for the entire file to load into memory before sending data.

Where.pipe can be thought of as a “pipe/channel” method for a stream, any type of stream will have this.pipe method to pair the inputs and outputs of the stream.

For ease of understanding, we treat the above two approaches (no stream/use stream) as the following scenario:

Not using streams:

Use the stream:

Using a stream can greatly improve response time and reduce the stress on server memory.

Common event

All Stream objects are instances of EventEmitter. Common events are:

  • data– Triggered when data is available to read, and whenever the flow hands ownership of a block of data to the consumer'data'Events.
  • end– Triggered when there is no more data to read, or when there is no more data in the stream to consume'end'Events.
  • error– Triggered when an error occurs during receive or write'error'Events.
  • finish– Triggered when all data has been written to the underlying system'finish'Events.
  • close– Triggered when the stream and any of its underlying resources, such as file descriptors, are closed'close'Events.
  • pipe– When called on a readable streamstream.pipe()Method that adds this writable stream to its target set'pipe'Events..

Readable stream

The Readable stream can produce data that you can send to a Writable, Transform or Duplex stream by calling the Pipe () method:

let { Readable } = require('stream'); let readable = Readable(); let source = ['a','b','c']; readable.setEncoding('utf8'); readable._read = function () { let data = source.shift()||null; console.log('read:',data); this.push(data); } readable.on('end', function () { console.log('end') }) readable.on('data', Function (data) {console.log(data)}) /* Output: read: a read: b a read: c b read: null c end */Copy the code

Readable works through instance _read or the read method, which is called automatically when data is needed.

  • The _read method ends with this.push(null).
  • The _read() method is called automatically when readable binds data

Data event: The data event is triggered when the data is read in and the incoming callback reads the content. End event: “exhausted”, two conditions need to be met:

  • Push (null) has been called, stating that no new data will be generated
  • The data in the cache is also read out

Writable stream

Writable flow is an abstraction of the ‘destination’ to which data is written. Writable streams function as downstream, consuming data provided by upstream.

let { Writable } = require('stream'); var writable = Writable({ write: function (data,_,next) { console.log(data); next && next(); } }) writable.write('a'); writable.write('b'); writable.write('c'); writable.end(); <Buffer 61> <Buffer 62> <Buffer 63> */Copy the code

The write method

  • The third argument to write() or _write(), next, is the callback function. Calling next() indicates that the write is complete and the next data is written.
  • The end() method must be called to tell writable that all data has been written.

The pipe flow

Pipes provide a mechanism for an output stream to an input stream. Typically we use it to get data from one stream and pass it to another stream.

As shown in the picture above, we compare the file to a bucket containing water, and water is the content of the file. We connect two buckets with a pipe to make water flow from one bucket to the other, thus slowly realizing the copying process of large files.

In the following example, we read the contents of one file and write the contents to another file.

Set the content of input. TXT to the following:

Nodejs Stream
Copy the code

Create the main.js file as follows:

var fs = require("fs"); Var readerStream = fs.createreadStream ('input.txt'); Var writerStream = fs.createWritestream ('output.txt'); TXT file and write the content to output. TXT readerstream. pipe(writerStream); Console. log(" program completed ");Copy the code

Chain flow

Chaining is the mechanism for creating chains of operations by connecting output streams to another stream. Chain flow is commonly used for pipeline operations.

The next step is to use pipes and chains to compress and decompress files.

Create the compresse.js file as follows:

var fs = require("fs"); var zlib = require('zlib'); Gz fs.createreadStream ('input.txt').pipe(zlib.creategzip ()) .pipe(fs.createWriteStream('input.txt.gz')); Console. log(" File compression completed." );Copy the code