The last article wrote the use of readable stream and source code implementation, you can go to see, there are similar places, repeat the place is not much to introduce, directly write usage. It is worth mentioning that writable streams have the concept of caching

The use of writable streams

Create writable streams

  • Detailed default parameter configuration,Post the instructions on the official websiteSimilar to readable stream, paste code directly
let fs = require('fs');
let ws = fs.createWriteStream('2.txt', {
    flags: 'w'// File operation,'w'Mode: 0o666, autoClose:true, highWaterMark: 3, // Default write 16*1024 encoding:'utf8'
});

Copy the code

The ws-write () method and the drain event

  • There is a flag after each write, and false is returned when the length of the write exceeds the highWaterMark
  • True means we can continue writing, if false means the cache is full and we should stop reading to avoid consuming too much memory.
  • After the cache is full, the file writes continuously, and after a while the cache is completely written and the cache is empty, the ‘drain’ event for the writable stream is emitted

Not sure? That’s ok. Let’s look at the following example to help us understand

let fs = require("fs");
let ws = fs.createWriteStream('1.txt',{
    flags:'w',
    encoding:'utf8',
    start:0,
    highWaterMark:3
});
let i =9;
function write() {
    let flag = true;
    while (flag && i>=0){
      flag =   ws.write(i-- +' '); // Write 9876543210 console.log(flag) to 1.txt; } } ws.on('drain',()=>{// The cache is full and written, and console.log() is triggered when the cache is empty."Dry"); write(); }) write(); // Call the write method the first timeCopy the code

Seeing this, we should have a basic understanding of the usage, so let’s start to write the source code implementation, since there are many similarities with the readable stream writing method has been written in detail in the previous article, not to mention the repetition, just do it

Writable flow implementation principles

1, declare WriteStream constructor (preparation)

let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter{
    constructor(path,options={}){
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.encoding = options.encoding || 'utf8';
        this.start = options.start || 0;
        this.pos = this.start;
        this.mode = options.mode || 0o666;
        this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 16 * 1024; // Write this. Writing = to the filefalse; This.cache = []; this.cache = []; this.cache = []; This.len =0; this.len =0; // Whether the drain event is triggered this.needDrain =false; this.open(); If an open event is triggered, the fd must exist}}Copy the code
  • So let’s just declare our variables, and we’ll see what’s going on when we use them, okay
  • The main thing is, the first time we write to the file we actually write to the file, and the next time we write to the file we put it in the cache, and then we fetch it from the cache

Implement the open and destroy methods

  • These two methods, like readable streams, are written directly into the code without introduction
open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit('error', err); // An error occurred when opening the file. Emit the error this.emit('error');
                if(this.autoclose) {// Fd this.destroy(); // Close the file (trigger the close event)}return; } this.fd = fd; // Save the file descriptor this.emit('open', this.fd) // Trigger file open method})}destroy() {
        if(typeof this.fd ! ='number') {// If the file is not open, close the file and trigger the close eventreturn this.emit('close'); } fs.close(this.fd, () => {// If the file is opened, close the file and emit the close event this.emit("close"); })}Copy the code

3. Implement the write method, corresponding to the client calling the ws-write () method

Write (chunk,encoding = this.encoding,callback){// The client calls write //chunk must be a buffer or a string, IsBuffer (chunk) = buffer. IsBuffer (chunk)? chunk : Buffer.from(chunk,encoding); this.len +=chunk.length; // Maintain the length of the cache written toletret = this.len <this.highWaterMark; // An identifier is compared to see if the size of the cache is reached this.needDrain =! ret; // Whether needDrain needs to be triggeredif(this.writing){// Default isfalseThis.cache. push({chunk,encoding,callback})}else{// first time write this.writing =true; this._write(chunk,encoding,()=>this.clearBuffer()); // write specific methods}returnret; // Can not continue to writefalseThe next time you write, you need to use up memory.Copy the code
  • The _write() method focuses, which we specifically use to implement real write methods

4. Implement the highlight _write() method

 _write(chunk,encoding,clearBuffer){
        if(typeof this.fd! ='number'){// Since the write method is called synchronously, the fd has not been acquired yet, so wait for the acquisition before executing the writereturn this.once('open',()=>this._write(chunk,encoding,clearBuffer)); } / / make sure have a fd fs. Write (enclosing fd, chunk, 0, the chunk. The length of this. Pos, (err, byteWritten) = > {this. Pos + = byteWritten; This. Len -=byteWritten; // This. Writing = is decrement in memory after each writefalse; ClearBuffer (); // Clear the cache})}Copy the code

5. Clear the cache

 letbuffer = this.cache.shift(); // take the first contents of the cacheif(buffer) {/ / in the cache. This _write (buffer. The chunk, buffer. The encoding, () = > enclosing clearBuffer ())}else{// There is nothing in the cacheif(this.needdrain){// Need to trigger the drain event this.writing =false; NeedDrain = this. NeedDrain =false;
                this.emit('drain'); }}Copy the code
  • Under the analysis of
  • If writing is in progress, the contents are placed in the cache first, this. Cache, default []
  • Chunk, encoding, callback =>this.clearBuffer())
  • The cache needs to be emptied after each write
  • The drain event is emitted when the cache array is empty

Write here, basically finished, the source code has more than 3000 lines, this is just a simple implementation, do not understand when to write more than a few times (PS I also wrote a lot of times). Let’s test it out and see if it works

  • Perfect. As always, post the source code at Github reference