I have never understood the concept of Stream in Node.js, but I have recently studied it in depth, and I will try to explain it in an article. The code in this article is located at github.com/Maricaya/no…
What is a Stream?
A stream is an abstract data structure. Just like an array or a string, a stream is a collection of data.
The difference is that a stream can output a small amount of data at a time, and it doesn’t have to be in memory.
For example, the request/ Response object that makes an HTTP request to the server is a Stream.
The illustration Stream
A stream is like a stream of water, but by default there is no water. Stream. write can make water in the stream, that is, write data.
The upper-left corner is the segment that generates the data, called the source. And at the bottom is the sink where you get the data. The dots that flow from top to bottom are pieces of data written at a time, called chunks.
Why do I need a Stream
Stream nodes can also be used to read and write data without using stream nodes.
Yes, but the way to read and write is to read all the contents of the file into memory and then write to the file, which is not a problem for small files.
But encountered large files, it is unbearable.
The flow can divide the file resources into small pieces and transport them piece by piece. The resources are transferred like water, reducing the pressure on the server.
The Stream instance
In case that doesn’t convince you, let’s try an experiment to see if it’s necessary to use streams when reading and writing large files.
First create a large file:
Create large files with Stream
We first create a writable stream that writes multiple times to the file. Finally, remember to close the stream and get a large file.
// Import the file module
const fs = require('fs');
const stream = fs.createWriteStream('. /.. /big_file.txt');
for (let i = 0; i < 1000000; i++) {
stream.write(` this is the first${i}Line \ n `);
}
stream.end()
console.log('done')
Copy the code
Using readFile
Let’s first read the contents of the file using fs.readfile and see what happens.
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request', (request, response) => {
fs.readFile('. /.. /big_file.txt', (error, data) => {
if (error) throw error
response.end(data)
console.log('done')
})
})
server.listen(8889)
console.log(8889)
Copy the code
When we access http://localhost:8889, the server asynchronously reads this large file.
Everything seems to be all right.
However, we used the task manager to look at the node.js memory, and it was around 130Mb.
The server received one request, occupying 130 Mb. So if you accept 10 requests, you’re taking 1G. The memory consumption on the server is high.
How to solve this problem? Use of the Stream.
Use of the Stream
Let’s try rewriting the above example with Stream.
Create a readable stream createReadStream and connect the stream to the response stream via a pipe.
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request', (request, response) => {
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
})
server.listen(8888)
Copy the code
Again, we look at the node.js memory usage, and it’s basically no higher than 30Mb.
Because only a small amount of data is passed at a time, it does not take up much memory.
Pipeline pipe
As long as stream1 has data, it will flow to Stream2.
Like the code above:
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
Copy the code
Stream is a file stream, and the following stream is our HTTP stream response. Originally the two streams were unrelated, and now we want to pass the data from the file stream to the HTTP stream. That’s easy. Just use a pipe.
Commonly used code
stream1.pipe(stream2)
- Stream1 is the stream that emits data, a readable stream.
- Stream2 is the stream that writes data, a writable stream.
The chain operation
A stream of water can flow through an infinite number of pipes, and so can a stream of data.
There are two ways to write it:
a.pipe(b).pipe(c)
/ / equivalent to the
a.pipe(b)
b.pipe(c)
Copy the code
Pipe principle
A pipe can also be thought of as the encapsulation of two events
- Listen for data events, and stream1 feeds it to Stream2 as soon as it has data
- Listen for the end event and stop stream2 when stream1 stops
stream1.on('data', (chunk) => {
stream2.write(chunk)
})
stream1.on('end', () => {
stream2.end()
})
Copy the code
A prototype chain of Stream objects
Knowing the Stream prototype chain makes it easier to remember the Stream API.
fs.createReadStream(path)
If s = fs.createreadStream (path), then the object hierarchy of S is:
- Property of itself, by
fs.ReadStream
Constructed by the constructor - Prototype:
stream.Readable.prototype
- Secondary prototype:
stream.Stream.prototype
- Level 3 prototype:
events.EventEmitter.prototype
This is the prototype that all streams inherit - Level 4 prototype:
Object.prototype
The prototype that all objects inherit from
Events and methods supported by Stream
With that in mind, let’s look at the events and methods that Stream supports.
Just get an impression. Look it up when you need it.
Readable Stream | Writable Stream | |
---|---|---|
The event | data, end, error,close,readable | Drain (write this time),finish(write the whole time),error,close,pipe,unpipe |
methods | pipe() unpipe() read()… | write() destroy() … |
The Stream classification
There are four categories
The name of the | The characteristics of |
---|---|
Readable | Can be read |
Writable | Can write |
Duplex | Readable and writable (two-way) |
Transform | Readable and writable (variations) |
Both Readable and Writable are unidirectional, while the other two are bidirectional.
Readable, writable and understandable. What’s the difference between the other two?
And Transform is write and read.
A Babel, for example, converts es6 to, we write ES6 on the left and read ES5 on the right. It’s like a car wash. The black car goes in, the white car comes out.
Readable Stream is a Readable Stream
Paused and flow dynamic flowing
Readable flows have two states, paused and flowing.
A readable stream can be thought of as a content producer, static when no content is sent, and stream dynamic when content resumes.
- The readable stream is in the paused state by default.
- Once the Data event listener is added, it becomes flowing.
- Delete the data event listener, paused state.
- Pause () can change it to paused.
- Resume () will make it flowing.
const http = require('http')
const fs = require('fs')
const server = http.createServer()
server,on('request', (request, response) => {
// The default is paused
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
stream.pause(); / / pause
setTimeout((a)= > {
/ / recovery
stream.resume()
}, 3000)
})
server.listen(8888);
Copy the code
Writable Stream
Drain Drains the event
It means you’re ready to add some water, which means you can continue writing data. When we call stream.write(chunk), we might get false.
False means you are writing too fast, there is a backlog of data.
At this point, we can no longer write, but listen for drain.
When the drain event triggers, we can continue writing.
It’s a little hard to understand just by looking at this, but here’s an example from our website:
const fs = require('fs');
// Write data to file 1000000 times
function writeOneMillionTimes(writer, data) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Write for the last time
writer.write(data);
} else {
// Check if you can continue writing
// if ok is false, it means that you are writing too fast
ok = writer.write(data);
if (ok === false) {
console.log('No more.')}}}while (i > 0 && ok);
if (i > 0) {
// It is dry and can continue writing
writer.once('drain', () = > {console.log('Dried up') write() }); }}}const write = fs.createWriteStream('. /.. /big_file.txt')
writeOneMillionTimes(write, 'hello world')
Copy the code
Finish the event
After calling stream.end() and after the buffer data has been passed to the underlying system, the Finish event is fired.
When we write data to a file, instead of saving it directly to the hard drive, we put it in a buffer first. Data is written to disks only when it reaches a certain size.
Create your own stream
Let’s take a look at how to create your own stream.
Follow each of the four types of flow.
Writable
const {Writable} = require('stream')
const outStream = new Writable({
// If others call, what do we do
write(chunk, encoding, callback) {
console.log(chunk.toString())
// Proceed to the next process
callback()
}
})
process.stdin.pipe(outStream);
Copy the code
Save the file as writable.js and run it with node. No matter what you put in, you get the same result.
Readable
Read all the data at once
const {Readable} = require('stream')
const inStream = new Readable()
inStream.push('hello world') // Write data
inStream.push('hello node')
inStream.push(null) // There is no data left
// Import the readable stream into the writable stream process.stdout.
inStream.pipe(process.stdout)
Copy the code
Push all data into inStream and pipe it into the writable stream process.stdout.
This way, when we run the file with Node, we can read all the data from inStream and print it out.
It’s simple, but it’s not efficient.
A better approach is to push on demand, and then read the data when the user wants it.
You have to call read to give the data once
In this way, the data is supplied on demand, and we only give it once when they call read.
For example, in the following example, we push one character at A time, starting with character code 65 (for A).
So when the user reads, it’s going to keep triggering read, and we’re going to push more characters.
When all characters are pushed, we push null and stop the Stream.
const {Readable} = require('stream')
const inStream = new Readable({
read(size) {
const char = String.fromCharCode(this.currentCharCode++)
this.push(char);
console.log(` pushed on${char}`)
// Stop at this time
if (this.currentCharCode > 90) { // Z
this.push(null)
}
}
})
inStream.currentCharCode = 65 // A
inStream.pipe(process.stdout)
Copy the code
Duplex Stream
Once you’ve seen both readable and writable streams, Duplex Stream is much simpler.
Implement both write and read methods.
const {Duplex} = require('stream')
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++))
if (this.currentCharCode > 90) {
this.pull(null)
}
}
})
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Copy the code
Transform Stream
For Transform Stream, we implement the Transform method, which combines both the readable and writable methods.
Here’s a simple transform example that prints any character you type in uppercase:
const {Transform} = require('stream')
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
// 1. Read data chunk.toString()
// 2. Write data this. Push (XXX)
this.push(chunk.toString().toUpperCase()) callback(); }})// Listen for user input and call upperCaseTr
// After the conversion is complete, output
process.stdin.pipe(upperCaseTr)
.pipe(process.stdout)
Copy the code
Node.js built-in Transform Stream
For example, the best solution often said in the interview: GZIP compression.
This is done in four lines of code in Node.js
const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]
fs.createReadStream(file)
.pipe(zlib.createGzip())
.on('data', () => process.stdout.write(".")) // Display the progress bar
.pipe(fs.createWriteStream(file + ".gz"))
Copy the code
Stream is ubiquitous in Node.js
Readable Stream | Writeable Stream |
---|---|
HTTP Response client | HTTP Request client |
HTTP Request server | HTTP Response server |
fs read stream | fs write stream |
zlib stream | zlib stream |
TCP sockets | TCP sockets |
child process stdout & stderr | child process stdin |
process.stdin | process.stdout,process.stderr |
. | . |
Data backlog problem
There is another very important issue in Stream: the data backlog.
If the data is too much, blocked, how to solve.
The node.js website has a special article explaining how to solve the problem, so you can check it out if you have a problem.
I won’t repeat it here, the address is here
conclusion
Let’s summarize what we’ve learned so far about Stream objects.
-
Why use Stream?
- Because when reading and writing large files, you can effectively reduce memory pressure.
-
A pipe is an important concept in a Stream that connects streams.
-
Stream objects inherit EventEmitter.
-
Stream is divided into four categories
- Readable, with two states: Paused, flowing.
- Writable, two important events: drain, finish.
- Duplex readable and writable
- Readable and writable Transform
-
Finally, how to create four classes of Stream and Stream in Node.js