transform stream

Through2 is a transform stream wrapper library that handles node streams through a through2. The source code is only 100 lines long, but the through2 is interesting.

Transformconcept

A Transform is a flow of transformations that can be both read and written — a special form of Duplex. Duplex’s writes and reads are unrelated, the two buffers and pipes do not interfere with each other, and Transform correlates its inputs and outputs, transforming the stream through a conversion function.

transform streamA simple case

Implements a transform stream that replaces the target string with the specified string:


// replaceStream.ts

import { Transform } from "stream";

export default class ReplaceStream extends Transform {
  constructor(
    private searchString: string,
    private replaceString: string,
    private tailPiece: string = ""
  ) {
    super(a); } _transform(chunk, encoding, callback) {const pieces = (this.tailPiece + chunk).split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);
    pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
    this.push(pieces.join(this.replaceString));

    callback();
  }

  /** In some cases, the conversion operation may need to send some additional data at the end of the stream. * /
  _flush(callback) {
    this.push(this.tailPiece);
    this.push("\n")
    this.push("haha") callback(); }}// replaceStreamTest.ts

import ReplaceStream from "./replaceStream";

const re = new ReplaceStream("World"."Nodejs");

re.on("data", chunk => console.log(chunk.toString()));

re.write("hello w");
re.write("orld");
re.end();

Copy the code

Create a new class and inherit the stream.Transform base class. The constructor of this class takes two arguments: searchString and replaceString. These two parameters specify the string to look for a match and the string to replace. An internal variable tailPieceLen is also initialized to be used by the _transform() method.

The _transform() method has the same method signature as the _write() method, but instead of writing the data directly to the underlying resource, the this.push() method is used to push it to the internal cache, as we did in the readable stream _read() method. This indicates that the two parts of the transformation flow are actually connected.

The _flush() method takes only one callback function as an argument and must ensure that it is called after all operations are complete to terminate the stream.

through2Core source code interpretation

Through2.js is a simple wrapper library for Transform Stream, and it’s very simple to use. Here’s the core code.

function through2 (construct) {
  return function throughHOC (options, transform, flush) {
    if (typeof options == 'function') {
      flush     = transform
      transform = options
      options   = {}
    }

    if (typeoftransform ! ='function')
      transform = noop

    if (typeofflush ! ='function')
      flush = null

    return construct(options, transform, flush)
  }
}
Copy the code

This is the segment factory function that the three apis of through2.js are generated from.

Through2 is also a high-order function that receives a single parameter, construct, which in this project will take three arguments to the corresponding three apis.

Through2 also returns a higher-order function, named throughHOC for better understanding, which takes three formal parameters:

  • options TransformClass instance parameter
  • transformActual conversion function
  • flushThe method takes only one callback function as an argument and must ensure that it is called after all operations have finished, terminating the stream

The throughHOC function does some sorting of the parameters inside:

  • ifoptionsIs afunction, then theoptionsThat’s the transformation function,optionsWill be a default value;
  • ifoptionsThere is,transformIs not afunctionthattransformIs reset to the default conversion function;
  • ifflushIs not afunctionIs reset tonull

Once the parameters are sorted out, we pass them into the construct function as parameters. Construct is a method that implements the three apis.

Before we talk about the API methods, let’s consider the processing of the Transform class — DestroyableTransform:

function DestroyableTransform(opts) {
  Transform.call(this, opts)
  this._destroyed = false
}

inherits(DestroyableTransform, Transform)

DestroyableTransform.prototype.destroy = function(err) {
  if (this._destroyed) return
  this._destroyed = true
  
  var self = this
  process.nextTick(function() {
    if (err)
      self.emit('error', err)
    self.emit('close')})}Copy the code

DestroyableTransform Inherits Transform and implements destroy. When destroy is triggered, you need to manually fire the close event.

Here are the functions that implement the three apis:

The main method


let construct = function (options, transform, flush) {
  var t2 = new DestroyableTransform(options)

  t2._transform = transform

  if (flush)
    t2._flush = flush

  return t2
}

module.exports = through2(construct)
Copy the code

The through2 function, whose argument is the construct function above, instantiates a DestroyableTransform class. Options is a configuration parameter passed in from the outside. Next, the _transform and _flush methods are reimplemented.

through2.obj

The only difference between this API and the main method is that objectMode is turned on, with the objectMode property set to true and the highWaterMark property set to 16

var t2 = new DestroyableTransform(Object.assign({ objectMode: true.highWaterMark: 16 }, options))
Copy the code

through2.ctor

This API returns a DestroyableTransform subclass, not an instance of a Transform Stream. The only difference between using this API and the main method is that you need to instantiate the API return value.


let construct = function (options, transform, flush) {
  function Through2 (override) {
    if(! (this instanceof Through2))
      return new Through2(override)

    this.options = Object.assign({}, options, override)

    DestroyableTransform.call(this.this.options)
  }

  inherits(Through2, DestroyableTransform)

  Through2.prototype._transform = transform

  if (flush)
    Through2.prototype._flush = flush

  return Through2
}

module.exports.ctor = through2(construct)
Copy the code

Use:

const through2 = require('through2')
const Ctor = through2.ctor(function(chunk, enc, callback) {
  console.log('chunk', chunk.toString());
  callback(null, chunk);
});
const th2 = new Ctor();
Copy the code

through2usetypescriptrefactoring

I have to marvel at the power of this project. It is only a simple encapsulation of Transform, but it reveals a lot of content. Although there are only two additional API extensions in the project, more extensions can be made after you are familiar with the source code. This brings us to the power of the Transform stream Transform.

After learning the source code, I used typescript to reconstruct the code, which is more clear and has more understanding, which is worth learning.

Source address –through2-ts