What is Stream?

Streams is a data set — just like arrays and strings. The difference is that Streams data may not be available all at once; they do not need to be in memory. This makes Streams useful for working with large data sets or blocks of data from external sources.

However, Streams is not only useful for big data, it also gives us composable capabilities in code. Just as powerful Linux commands are formed by sending other smaller Linux commands, we can do the same with Streams in Node.

A stream in Node.js is an abstract interface that processes stream data. The STREAM module provides the basic API. Using these apis, you can easily build objects that implement the flow interface.

Node.js provides a variety of stream objects. For example, HTTP requests and process.stdout are instances of flows.

Streams can be readable, writable, or read-write. All stream objects are instances of EventEmitter and implement the Interface to EventEmitter.

That is, a stream has the ability to emit events to feedback the state of the stream. This allows us to register events that listen to the stream for our purposes. That is, we subscribe to an event for the stream, and when that event is triggered, the stream notifies me, and THEN I can do something about it.

The type of flow

There are four basic stream types in Node.js:

  • Readable – a stream that is Readable (for examplefs.createReadStream()).
  • Writable – a Writable stream (e.gfs.createWriteStream()).
  • Duplex – a stream that is readable and writable (e.gnet.Socket).
  • Transform – A stream of Duplex data that can be modified and transformed during reading and writing (e.gzlib.createDeflate()).

Stream.readable

Readable flows come in two modes: flowing and Paused

1) In flow mode, readable streams automatically read data from the bottom of the system and provide data to the application as soon as possible through events from the EventEmitter interface.

2) In pause mode, it must appear that the stream.read() method is called to read a piece of data from the stream.

Note: If the readable stream switches to flow mode and no consumer processes the data in the stream, the data will be lost.

Readable Streams are an abstraction of the sources that provide the data

Examples of readable streams include

  • HTTP responses, on the client: Client requests
  • HTTP requests, on the server: server requests
  • Fs read streams: reads files
  • Zlib streams: compression
  • Crypto Streams: encryption
  • TCP Sockets: TCP protocol
  • Child process stdout and stderr: Standard output and error output of the child process
  • Process. stdin: standard input

All Readable implements the interface defined by the Stream.Readable class

Read data through streams

  • After creating the object Readable with Readable, you have a Readable stream
  • If you implement the _read method, you connect to an underlying data source
  • The flow requests data from the bottom layer by calling _read, which in turn calls the flow’s push method to pass the data it needs
  • When readable connects to the data source, the downstream can call readable.read(n) to request data from the stream while listening for readable’s Data event to receive the received data

Here is a quick example of a readable stream:

  • Listen for data events in the readable stream, and once you start listening for data events, the stream can read the contents of the file and emit data, read and emit data, read and emit data
  • By default, when you listen for a data event, you keep reading the data, then firing the data event, and then reading the data again
  • In addition, set a buffer with a default size of 64K. For example, if the file is 128K, read 64K and then read 64K, then send it out twice
  • The size of the buffer can be set by highWaterMark
let fs = require('fs');
// By creating a readable stream
let rs = fs.createReadStream('./1.txt', {flags:'r'.// What we want to do with the file
    mode:0o666./ / permission bits
    encoding:'utf8'.// No buffer is displayed as a string by default
    start:3.// Start reading at index 3
    // This is the only one I've seen that includes the end index
    end:8.// Read until the index is 8
    highWaterMark:3// Buffer size
});
rs.on('open'.function () {
    console.log('File open');
});
rs.setEncoding('utf8');// Displays as a string
// You want the flow to have a mechanism to pause and resume firing
rs.on('data'.function (data) {
    console.log(data);
    rs.pause();// Suspend reading and firing data events
    setTimeout(function(){
        rs.resume();// Resume reading and fire the data event
    },2000);
});
// If there is an error in reading the file, an error event is raised
rs.on('error'.function () {
    console.log("error");
});
// If the file is finished, the end event is emitted
rs.on('end'.function () {
    console.log('Finished reading');
});
rs.on('close'.function () {
    console.log('File Closed');
});

/** file opened 334 455 read file closed **/
Copy the code

Stream.writable

1. The write() method of a Writable stream writes data to the stream.

Chunk is the data to be written, which is a Buffer or String. This parameter is required; all other parameters are optional. If chunk is a String, encoding can be used to specify the encoding format of the String, and write decodes chunk into a byte stream based on the encoding format. Callback is a callback function that is executed when the data is completely flushed to the stream. The write method returns a Boolean value that returns true when the data has been fully processed (not necessarily written to the device).

2. The end() method of a Writable stream can be used to end a Writable stream. Its three arguments are optional. Chunk and encoding have similar meanings to the write methods. Callback is an optional callback that is associated with the Finish event of Writable when you provide it, so that it will be called when the Finish event fires.

Common events:

Drain event: When a stream is not in the drain state, the call to write() caches the data block and returns false. Once all currently cached data blocks are drained (accepted by the operating system for output), the ‘drain’ event is emitted

Finish event: The ‘Finish’ event is emitted after the stream.end() method is called and the buffer data has been passed to the underlying system.

Writable streams are an abstract ‘destination’ for writing data. Examples of writable streams include:

  • HTTP requests, on the client
  • HTTP responses, on the server Server response
  • Fs write Streams file
  • Zlib compression streams
  • Crypto streams encryption
  • TCP Sockets TCP server
  • Child Process Stdin Standard input of the child process
  • Stdout, process.stderr standard output, error output

Here’s a simple example of a writable stream (when you write data to a writable stream, instead of writing to a file immediately, you write to the highWaterMark cache, which is the size of the highWaterMark. The default value is 16K. Then wait until the cache is full and write to the file again.

let fs = require('fs');
let ws = fs.createWriteStream('./2.txt', {flags:'w'.mode:0o666.start:3.highWaterMark:3// The default is 16K
});
Copy the code
  • Return false if the cache is full, true if it is not
  • Return true if it can continue, false if it cannot
  • It’s supposed that if you return false, you can’t write any more, but if you do write it, it won’t get lost, it’ll be cached in memory. Wait until the cache is clear and then read it out of memory
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false
Copy the code

‘drain’ tapping

If calling the stream.write(chunk) method returns false, the stream will fire the ‘drain’ event at an appropriate time before writing data to the stream continues

When a stream is not in drain state, the call to write() caches the data block and returns false. Once all currently cached data blocks are drained (accepted by the operating system for output), the ‘drain’ event is emitted

It is recommended that once write() returns false, no blocks of data can be written until the ‘drain’ event is triggered

Here’s a simple example:

let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {flags:'w'.mode:0o666.start:0.highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;// The cache is not full
// The write method is synchronous, but the file writing process is asynchronous.
// Our callback function is executed after the file is actually written
 while(flag && count>0) {console.log('before',count);
     flag = ws.write((count)+' '.'utf8', (function (i) {
         return (a)= >console.log('after',i);
     })(count));
     count--;
 }
}
write();/ / 987
// Listen for the cache clearing event
ws.on('drain'.function () {
    console.log('drain');
    write();/ / 654 321
});
ws.on('error'.function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
Copy the code

If no more writing is needed, the end method can be called to close the write to the stream, and no more writing can be done once the end method is called

Such as the ws. The end (); After writing the ws. Write (‘ x ‘); , an error message is displayed write after end

“Pipe” event

  • Linux classic concept of pipes, the output of the former is the input of the latter
  • Pipe is the simplest and most direct way to connect two streams, internally implementing the entire process of data transfer, without the need to focus on internal data flow at the time of development
  • This method from a readable stream pull all the data, and will provide the data written to the target Of automatic management flow, the data retention amount limit to an acceptable level, to make the source of the different speed and the target will not drown memory available by default, when the source data flow trigger the end called end (), so the goal of writing data to write. Pass {end:false} as options to keep the destination stream open
Principle of pipe method
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data'.function (data) {
    var flag = ws.write(data);
    if(! flag) rs.pause(); }); ws.on('drain'.function () {
    rs.resume();
});
rs.on('end'.function () {
    ws.end();
});
Copy the code
Use of pipe:
let fs = require('fs');
let rs = fs.createReadStream('./1.txt', {highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt', {highWaterMark:3
});
rs.pipe(ws);
// Remove the target writable stream
rs.unpipe(ws);
Copy the code
  • The callback function is triggered when listening for the readable stream data event
  • A balance of producer and consumer speeds of data can be achieved
rs.on('data'.function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});
Copy the code