transform stream
Through2 is a transform stream wrapper library that handles node streams through a through2. The source code is only 100 lines long, but the through2 is interesting.
Transform
concept
A Transform is a flow of transformations that can be both read and written — a special form of Duplex. Duplex’s writes and reads are unrelated, the two buffers and pipes do not interfere with each other, and Transform correlates its inputs and outputs, transforming the stream through a conversion function.
transform stream
A simple case
Implements a transform stream that replaces the target string with the specified string:
// replaceStream.ts
import { Transform } from "stream";
export default class ReplaceStream extends Transform {
constructor(
private searchString: string,
private replaceString: string,
private tailPiece: string = ""
) {
super(a); } _transform(chunk, encoding, callback) {const pieces = (this.tailPiece + chunk).split(this.searchString);
const lastPiece = pieces[pieces.length - 1];
const tailPieceLen = this.searchString.length - 1;
this.tailPiece = lastPiece.slice(-tailPieceLen);
pieces[pieces.length - 1] = lastPiece.slice(0, -tailPieceLen);
this.push(pieces.join(this.replaceString));
callback();
}
/** In some cases, the conversion operation may need to send some additional data at the end of the stream. * /
_flush(callback) {
this.push(this.tailPiece);
this.push("\n")
this.push("haha") callback(); }}// replaceStreamTest.ts
import ReplaceStream from "./replaceStream";
const re = new ReplaceStream("World"."Nodejs");
re.on("data", chunk => console.log(chunk.toString()));
re.write("hello w");
re.write("orld");
re.end();
Copy the code
Create a new class and inherit the stream.Transform base class. The constructor of this class takes two arguments: searchString and replaceString. These two parameters specify the string to look for a match and the string to replace. An internal variable tailPieceLen is also initialized to be used by the _transform() method.
The _transform() method has the same method signature as the _write() method, but instead of writing the data directly to the underlying resource, the this.push() method is used to push it to the internal cache, as we did in the readable stream _read() method. This indicates that the two parts of the transformation flow are actually connected.
The _flush() method takes only one callback function as an argument and must ensure that it is called after all operations are complete to terminate the stream.
through2
Core source code interpretation
Through2.js is a simple wrapper library for Transform Stream, and it’s very simple to use. Here’s the core code.
function through2 (construct) {
return function throughHOC (options, transform, flush) {
if (typeof options == 'function') {
flush = transform
transform = options
options = {}
}
if (typeoftransform ! ='function')
transform = noop
if (typeofflush ! ='function')
flush = null
return construct(options, transform, flush)
}
}
Copy the code
This is the segment factory function that the three apis of through2.js are generated from.
Through2 is also a high-order function that receives a single parameter, construct, which in this project will take three arguments to the corresponding three apis.
Through2 also returns a higher-order function, named throughHOC for better understanding, which takes three formal parameters:
options
Transform
Class instance parametertransform
Actual conversion functionflush
The method takes only one callback function as an argument and must ensure that it is called after all operations have finished, terminating the stream
The throughHOC function does some sorting of the parameters inside:
- if
options
Is afunction
, then theoptions
That’s the transformation function,options
Will be a default value; - if
options
There is,transform
Is not afunction
thattransform
Is reset to the default conversion function; - if
flush
Is not afunction
Is reset tonull
Once the parameters are sorted out, we pass them into the construct function as parameters. Construct is a method that implements the three apis.
Before we talk about the API methods, let’s consider the processing of the Transform class — DestroyableTransform:
function DestroyableTransform(opts) {
Transform.call(this, opts)
this._destroyed = false
}
inherits(DestroyableTransform, Transform)
DestroyableTransform.prototype.destroy = function(err) {
if (this._destroyed) return
this._destroyed = true
var self = this
process.nextTick(function() {
if (err)
self.emit('error', err)
self.emit('close')})}Copy the code
DestroyableTransform Inherits Transform and implements destroy. When destroy is triggered, you need to manually fire the close event.
Here are the functions that implement the three apis:
The main method
let construct = function (options, transform, flush) {
var t2 = new DestroyableTransform(options)
t2._transform = transform
if (flush)
t2._flush = flush
return t2
}
module.exports = through2(construct)
Copy the code
The through2 function, whose argument is the construct function above, instantiates a DestroyableTransform class. Options is a configuration parameter passed in from the outside. Next, the _transform and _flush methods are reimplemented.
through2.obj
The only difference between this API and the main method is that objectMode is turned on, with the objectMode property set to true and the highWaterMark property set to 16
var t2 = new DestroyableTransform(Object.assign({ objectMode: true.highWaterMark: 16 }, options))
Copy the code
through2.ctor
This API returns a DestroyableTransform subclass, not an instance of a Transform Stream. The only difference between using this API and the main method is that you need to instantiate the API return value.
let construct = function (options, transform, flush) {
function Through2 (override) {
if(! (this instanceof Through2))
return new Through2(override)
this.options = Object.assign({}, options, override)
DestroyableTransform.call(this.this.options)
}
inherits(Through2, DestroyableTransform)
Through2.prototype._transform = transform
if (flush)
Through2.prototype._flush = flush
return Through2
}
module.exports.ctor = through2(construct)
Copy the code
Use:
const through2 = require('through2')
const Ctor = through2.ctor(function(chunk, enc, callback) {
console.log('chunk', chunk.toString());
callback(null, chunk);
});
const th2 = new Ctor();
Copy the code
through2
usetypescript
refactoring
I have to marvel at the power of this project. It is only a simple encapsulation of Transform, but it reveals a lot of content. Although there are only two additional API extensions in the project, more extensions can be made after you are familiar with the source code. This brings us to the power of the Transform stream Transform.
After learning the source code, I used typescript to reconstruct the code, which is more clear and has more understanding, which is worth learning.
Source address –through2-ts