Reprint please keep this part of the content, indicate the source. Pay attention to the public number “No.1 front end”, weekly fresh front end good posts push.

In addition, the front end team of Toutiao is looking forward to your joining

An overview of the

Stream is a very basic concept in NodeJs, and many of the basic modules are implemented based on streams, which play a very important role. Flow is a very difficult to understand the concept of related documents and less at the same time, for NodeJs beginners to understand the flow often need to spend a lot of time understanding, can truly grasp this concept, fortunately, for most users of NodeJs, is only used to develop Web applications, convection don’t fully understand does not affect use. However, understanding streams can give you a better understanding of other modules in NodeJs, and in some cases, it’s better to use streams to process data.

Consider using NodeJs to handle the following scenario:

Use HTTP request to download a tar package, decompress it, and upload the decompressed file to tos

Before you look at NodeJs streams, you might think of the following:

After downloading and decompressing the file to the local PC, read the local file and upload the file to the Tos

There are many problems with the above methods:

  • Low efficiency, more read and write local file IO;

  • Easy to cause the use of memory overflow;

Flow is actually a good way to solve these types of problems, if you can use flow thinking to think, you can solve this problem.

What is flow?

The understanding of the flow

Here’s the official definition: STREAM is an abstract interface in Node.js that processes streaming data.

The official definition is actually a little difficult to understand, but the flow here can be understood in two ways.

  • For the average developer, a stream can be thought of as a collection of data. It can be thought of as an array, and only needs to be concerned with getting (consuming) and writing (producing) the data.

  • For the stream developer (who creates a new instance using the Stream module), the focus is on how to implement this abstract interface, usually focusing on two things: what is the resource target and how to manipulate the data. Once the resource target is determined, actions need to be taken based on the state of the flow and the state of the resource target.

The type of flow

There are four types of streams:

  • Readable stream

  • Writable stream

  • Duplex flow

  • Transform flow

The readable and writable streams are the basis, and once these two streams are understood, it becomes easier to split the other two.

A readable stream

A readable stream is an abstraction of the source that provides the data.

Implementation of readable streams

Implementing a readable stream is implementing the _read method, which defines where and how data is pushed into the buffer pool. It is important to note that the data is not pushed directly to the consumer, but first goes through a buffer pool. When the buffer pool is full (beyond highWaterMark), “backpressure” is generated and false is returned when push is called.

Class MyReadable extends Readable {/** * @param size specifies the Readable size */ _read(size? :number) { const data = getDataFromSomeWhere(); // console.log('source:'+data) const res= this.push(data || null); // res will be false if over highWaterMark // push a null data means finished } }Copy the code
Readable stream mode

Readable streams can be read in two modes: Flowing or paused.

Suspend mode

In pause mode, when a readable stream read is created in pause mode, the _read method is automatically called to push data from the data source into the buffer pool until the buffer pool reaches the buoy value. Whenever the data reaches the buoy value, the readable stream emits a “readable” event, informing the consumer that the data is ready for further consumption.

In general, the ‘readable’ event indicates that the stream has new dynamics: either there is new data, or the end of the stream has been reached. Therefore, the ‘readable’ event will also be emitted before the data source is read;

In the handle of the consumer “readable” event, the data in the buffer pool is proactively retrieved via stream.read(size).

The demo:

import { Readable } from 'stream';
class Source {
    private data :string[] =Array(10).fill('byte');
    public getData() {
        return this.data.pop()
    }
}
class MyReadable extends Readable {
    source=new Source()
    _read() {
        const data=this.source.getData()
        // console.log('source:'+data)
        this.push(data||null);
    }
}
const myReadable = new MyReadable();
myReadable.setEncoding('utf-8')
myReadable.on('readable',()=>{
    console.log('readable')
    let data=''
    while(data=myReadable.read()){
        console.log(data)
    }
})
myReadable.on('end',()=>{
    console.log('end')
})
Copy the code
Flow pattern

All readable streams start in pause mode and can be switched to flow mode by:

  • Add a handle to the “data” event

  • Call the “resume” method;

  • Use the “pipe” method to send data to a writable stream

In flow mode, the data in the buffer pool is automatically output to the consumer for consumption. At the same time, after each output, the _read method is automatically called to put the data in the buffer pool. Until the flow mode switches to another pause mode or the data source is read (push(null));

Readable streams can be switched back to pause mode in the following ways:

  • If there is no pipe target, then stream.pause() is called.

  • Remove all pipe targets, if any. Calling stream.unpipe() removes multiple pipe targets.

The demo:

import { Readable } from 'stream'; class Source { private data: string[] = Array(10).fill('byte'); public getData() { return this.data.pop(); } } class MyReadable extends Readable { source = new Source(); /** ** @param size Specifies the read size */ _read(size? :number) { const data = this.source.getData(); // console.log('source:'+data) this.push(data || null); } } const myReadable = new MyReadable(); myReadable.setEncoding('utf-8'); myReadable .on('data', chunk => { console.log(chunk); }) .on('end', () => { console.log('end'); });Copy the code
The difference between the two models

The pattern here is really for the consumer, which pattern to adopt to consume the data read from the stream. The difference is:

  • In flow mode, data is automatically read from the underlying system and is supplied to the application as quickly as possible through events on the EventEmitter interface.

  • In pause mode, you must explicitly call stream.read() to read the block.

pipe

The readable stream object has a pipe method that takes a writable stream as an argument and returns the target writable stream. With the flow PIPE method, the readable stream is automatically switched to flow mode.

Streams can be written

A writable stream is an abstraction of the destination to which data is to be written. To implement a writable stream is to implement the _write method of the writable stream, which is how to write the data to the destination when the producer inputs the data.

Import {Writable} from 'stream' class MyWritable extends Writable{/** * @param STR * @param encoding * The @param cb callback is required for each successful write. If not, the buffer queue will not be automatically cleared. */ _write(str:string,encoding:string,cb:(err? :any)=>void){ console.log(str) console.log(encoding) cb() } }Copy the code

Writable streams are simpler than readable streams.When a producer calls write(chunk), it internally chooses whether to cache the data in the buffer queue or to call _write based on some status (corked, writing, etc.). After each write, it attempts to clear the cache queue. If the size of the buffer queue exceeds the highWaterMark, the consumer’s call to write(chunk) will return false and the producer should stop writing.

So when can I continue writing? When the buffer has been successfully written, the ‘drain’ event is emitted when the buffer queue is empty, allowing the producer to continue writing data.

When the producer wants to finish writing data, it calls the stream.end method to notify the writable stream that it is finished.

demo

Import {Writable} from 'stream' class MyWritable extends Writable{/** ** @param STR @param encoding * The @param cb callback is required for each successful write. If not, the buffer queue will not be automatically cleared. */ _write(str:Buffer,encoding:string,cb:(err? :any)=>void){ console.log(str.toString('utf8')) console.log(encoding) cb() } } const myWritable = new MyWritable(); console.log(myWritable.write('hello world','utf8')) console.log( myWritable.write('hello world1','utf8'))Copy the code

Duplex flow

After understanding both readable and writable streams, duplex streams are easy to understand. Duplex streams actually implement both writable and readable streams simultaneously.It is important to note here that in a duplex stream, the two buffers are maintained independently, and the data source for both readable and writable streams is not necessarily the same target.

demo

import { Duplex } from 'stream'; class MyDuplex extends Duplex { readEnd = false; _read() { if (! this.readEnd) { this.push('read'); this.readEnd = true; } this.push(null); } _write(chunk, encoding, callback) { console.log(chunk.toString('utf8')); callback(); } } const duplex = new MyDuplex(); duplex .on('data', chunk => { console.log(chunk.toString()); }) .on('end', () => { console.log('read end'); }); duplex.write('write');Copy the code

Transformation flows

Like a duplex stream, a Transform stream implements both a writable (_write) and a readable (_read) stream, except that the data stream input is associated with the output.The important thing to note here is that while the input and output are related, they are not necessarily one-to-one or synchronized. For example, in compression, it is possible to compress multiple writes into one chunk, which means to provide a compressed chunk for consumers to consume. In addition, it is possible that multiple chunks may be output at a time. For example, during a conversion, the input of 1kb of data is enlarged to 1 MB and pushed into the buffer multiple times. If the buffer pool exceeds the buoy value, there will be “back pressure”.

If you are a smart student, you may find that the conversion stream is both writable and readable. Can you combine the above mentioned pipe method for readable streams to make infinite pipe connections? Indeed, both conversion streams and duplex streams can be piped.

readabl
    .pipe(tranform1)
    .pipe(tranform2)
    .pipe(tranformN)
Copy the code

Some common events and methods

A series of states are maintained within the flow, and different flows will undergo state changes at different stages. Since the flow is inherited from EventEmiter, when the state changes, the related events will be triggered.

A readable stream Streams can be written
The event – data
– drain
Instance methods – read()
– write()
Methods that need to be implemented _read _write

NodeJs built-in flow

Many modules in NodeJs are implemented using streams.

readable writable duplex transform
– Http client response
– Http client request
Tcp socket crypto

What are the applications?

  • Streaming render chunk transfer

demo

import { createServer } from 'http'; import { Readable } from "stream"; const source=[ ` <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, "> <meta http-equiv=" x-uA-compatible "content="ie=edge"> <title>Test</title> <script SRC = "https://unpkg.com/[email protected]/umd/react-dom.production.min.js" > < / script > < / head > `, ` <body> <div>chunk1</div> `, ` <div>chunk2</div> </body>`, `</html>` ] function getRS(){ let hasRead=false; const dataSource=[...source] return new Readable({ read(){ if(hasRead){ return } hasRead=true dataSource.forEach((d,index)=>{ setTimeout(() => { if(index===dataSource.length-1){ this.push(null) }else{ this.push(d) } }, index*1000); }) } }) } const server = createServer((req, res) => { let reqData; const readable= getRS(); req.on('data', chunk => { reqData += chunk; }); req.on('end', () => { console.log('req end') res.statusCode=200 readable.pipe(res); readable.on('end',()=>{ console.log('end') }) }); }); server.listen(3000);Copy the code
  • Large file reading

  • Upload and download large files

  • Data encryption and decryption

  • Data compression solution

What are the drawbacks of streams?

  • Documentation is not friendly enough

Because the Stream module is difficult to understand, to a large extent because NodeJs is not friendly, the official website is only Api documentation, important knowledge is fragmented, not a system, for some details of the implementation, often need to look at the source code to figure out, this is not very friendly for beginners.

  • The convective combination is not strong enough

Currently, only the pipe method is available for readable streams, which is not powerful enough. In some scenarios, you need to do a lot of extra work manually, for example, if you want to consume from two streams to one place. Those familiar with RX may know that there are many kinds of relationships between Observables. RX has related methods to combine different Observables. Currently, there are related tools in the community that can convert streams into RX objects and process data.

  • Improper use may cause performance problems

Because streams maintain a cache, the default buffer size is 16Kb, but it will automatically expand in some cases, there is a risk of OOM if streams are not consumed in time or are not terminated. In addition, the stream reads and writes data multiple times, so if there is an overhead to read and write to the data source, the overhead increases linearly with the number of reads and writes.

conclusion

This article tries to introduce the flow in NodeJs from a macro point, let everyone have a system point understanding of the convection, there are many details did not explain, if you are interested, you can see the source code of the flow in NodeJs.