preface

A few days ago I wrote a node readable stream, today I want to add the writable stream and pipe

Can write flow – Writable

Fs. createWriteStream call example

  • Data read for the first time is actually written to the target file
  • The rest of the read data is stored in the cache waiting to be written to the target file, depending on whether the read data exceeded the highWaterMark
const fs = require("fs"); const path = require("path"); const bPath = path.join(__dirname, "b.txt"); let ws = fs.createWriteStream(bPath, { flags: "w", encoding: "utf-8", autoClose: true, start: 0, highWaterMark: 3, }); ws.on("open", function (fd) { console.log("open", fd); }); ws.on("close", function () { console.log("close"); }); // String or buffer, ws.write Also returns a boolea value ws.write("1"); HighWaterMark let flag = ws.write("1"); // Flag indicates whether the current value to be written is directly to the file. HighWaterMark let flag = ws.write("1"); console.log({ flag }); //true flag = ws.write("1"); console.log({ flag }); //false flag = ws.write("1"); console.log({ flag }); //false flag = ws.write("14444444"); console.log({ flag }); //false ws.end(); //write+close, no call to trigger close will be made without calling endCopy the code
  • The effect

Custom writable stream initWriteStream

Inherit EventEmitter publishing subscriptions

const EventEmitter = require("events");
const fs = require("fs");
class WriteStream extends EventEmitter {}
module.exports = WriteStream;
Copy the code

Linked lists generate queues for file reading caching

Implementation of linked list & queue

Const Queue = require("./ Queue ");Copy the code

Initialize instance default data constructor()

constructor(path, options = {}) { super(); this.path = path; this.flags = options.flags || "w"; this.encoding = options.encoding || "utf8"; this.mode = options.mode || 0o666; / / the default 8 hexadecimal, 6 6 other permissions are three components can be read to write this. AutoClose = options. Start | | 0; this.highWaterMark = options.highWaterMark || 16 * 1024; This.len = 0; this.len = 0; NeedDrain = false; needDrain = false; needDrain = false; // // cache queue is used to store data that is not read by the first file, because the first read is directly inserted into the target file. This. cache = new Queue(); This. writing = false; this.start = options.start || 0; this.offset = this.start; // offset this.open(); }Copy the code
  • This. Mode file operation permissions default 0o666(0O indicates base 8)

    1. The positions of the three sixes correspond to: the permission of the user who owns the file; User group to which a file belongs User’s permission on it; Represents the permissions of other users on it
    2. Permissions are made up of r– read (corresponding to 4), W — write (corresponding to 2), and x– execute (corresponding to 1, for example, if there is.exe in the folder, click to execute directly)
    3. Therefore, the three groups of users have read and write permissions on files by default

open()

  • Call the fs. The open ()
  • The emit instance open method is called back and the fs.open return value fd is passed in as the argument
 open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      this.fd = fd;
      this.emit("open", fd);
    });
  }
Copy the code

write()

  • The converted instance passes in file data that needs to be written in buffer format
  • Check whether the length of the data to be written is larger than the highWaterMark. If the data to be read from the file is stored in the cache and not directly written to the target file (excluding whether the file is read for the first time).
  • Write the cb passed in and call clearBuffer to clear the cache
  • Check whether it is the first read, the first read directly write call _write(to be implemented)
  • The cache queue tail offers currently read data waiting to be written to the target file
Write (chunk, encoding = this.encoding, cb = () => {}) {// Convert all data to buffer chunk = buffer.isbuffer (chunk)? chunk : Buffer.from(chunk); this.len += chunk.length; // console.log({chunk},this.len ) let returnValue = this.len < this.highWaterMark; This.len -- this.needdrain =! returnValue; Let userCb = cb; let userCb = cb; cb = () => { userCb(); // clearBuffer this.clearbuffer (); }; _write if (! This. Writing) {/ / the first | | cache queue has been cleared to complete this. Writing = true; // console.log("first write"); this._write(chunk, encoding, cb); This.cache. offer({chunk, encoding, cb,}); } return returnValue; }Copy the code

ClearBuffer () clears the cache queue in turn

  • Queue execution order, first in first out principle
  • This.cache.poll () takes the header data in turn and executes this._write to the target file
  • Data cache queue poll out if not, then is the first time to write the action | | cache queue has been cleared. this.writing = false; The next file read can be written directly to the target file
  • If this.needDrain does as expected, the data read from the file is stored in the cache instead of being written directly to the target file
ClearBuffer () {let data = this.cache.poll(); clearBuffer() {let data = this.cache.poll(); // console.log('this.cache',this.cache) if (data) {// The value is written to the file this._write(data.chunk, data.encoding, data.cb); } else { this.writing = false; If (this.needdrain) {// If it is cache, trigger drain this.emit("drain"); }}}Copy the code

_write()

  • Fs.open () is asynchronous and fd will be a number after a successful read
  • Decide whether to subscribe to an open based on the fd type and call back yourself (until fd type is number)
  • Fs. write to write the current chunk.
_write(chunk, encoding, cb) { if (typeof this.fd ! == "number") { return this.once("open", () => this._write(chunk, encoding, cb)); } fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => { this.offset += written; // Maintain offset this.len -= written; // Reduce the number of caches by cb(); // Write successfully // console.log(this.cache); }); }Copy the code

Test custom Writable

const WriteStream = require("./initWriteStream"); let ws = new WriteStream(bPath, { highWaterMark: 3, }); let i = 0; Function write() {// write 0-9 let flag = true; while (i < 10 && flag) { flag = ws.write(i++ + ""); console.log(flag); }} ws.on("drain", function () {drain ⌚️ console.log(" done ") is triggered only when we write as expected and the data is emptied; write(); }); write();Copy the code
  • Ten digits, written in sequence, three times reaching the maximum expected value, and then three times clearing the cache in sequence as expected

  • Check to see if the expected value is correctly written to the target file

Finally, if you find this article helpful, please remember to like three times oh, thank you very much