Pipeline design pattern

The water pipe is too long, as long as there is a broken, it will leak, and is not conducive to the use of bending and turning in complex environment. So we divide the pipes into very short sections, and then we maximize the size of the pipes and make them fit together to meet all kinds of different needs.

Therefore, the design pattern of Pipeline is to cut complex and lengthy processes into small processes and tasks. Each minimal task can then be reused, forming complex and diverse processes by assembling different small tasks.

Finally, the “input” is introduced into the pipeline, according to each small task to operate on the input (processing, filtering), and finally output to meet the needs of the results.

Today the main study “Pipeline”, incidentally recommended a PHP plug-in: League/Pipeline.

gulp

The first time I knew the concept of “pipe” was through the use of gulp.

Gulp is an automatic task runner based on NodeJS. It can automatically test, check, merge, compress, format, automatically refresh the browser, deploy file generation, and listen for files to repeat the specified steps after changes. In implementation, she borrowed the pipe idea of the Unix operating system, the output of the previous level, directly into the input of the next level, making the operation very simple.

var gulp = require('gulp');
var less = require('gulp-less');
var minifyCSS = require('gulp-csso');
var concat = require('gulp-concat');
var sourcemaps = require('gulp-sourcemaps');

gulp.task('css'.function(){
  return gulp.src('client/templates/*.less')
    .pipe(less())
    .pipe(minifyCSS())
    .pipe(gulp.dest('build/css'))}); gulp.task('js'.function(){
  return gulp.src('client/javascript/*.js')
    .pipe(sourcemaps.init())
    .pipe(concat('app.min.js'))
    .pipe(sourcemaps.write())
    .pipe(gulp.dest('build/js'))}); gulp.task('default'['html'.'css'.'js' ]);
Copy the code

The above two tasks are mainly to parse, compress, output and other process operations of LESS and all JS files, and then save them in the corresponding folder. The output of each operation is the input of the next operation, like water in a pipe.

Illuminate\Pipeline

The middleware in the Laravel framework, which uses the Illuminate\Pipeline, was supposed to write my interpretation of the source code for The Laravel Middleware, but it was already being posted online, So this article will briefly explain how to use Illuminate\Pipeline.

Write a demo

public function demo(Request $request)
{
    $pipe1 = function ($payload, Closure $next) {
        $payload = $payload + 1;
        return $next($payload);
    };

    $pipe2 = function ($payload, Closure $next) {
        $payload = $payload * 3;
        return $next($payload);
    };

    $data = $request->input('data'.0);

    $pipeline = new Pipeline();

    return $pipeline
        ->send($data)
        ->through([$pipe1, $pipe2])
        ->then(function ($data) {
            return $data;
        });
}
Copy the code

For the analysis of the source code, you can recommend to see this article, the analysis is very thorough:

Laravel Pipeline component implementation www.insp.top/article/rea…

League\Pipeline

The simple use of gulp and Illuminate Pipeline above only tells us that “Pipeline” is widely used. If let us also write a similar plug-in out, I think it should not be very difficult.

Below I take League\Pipeline plug-in to scratch a scratch of its source code, see how to achieve.

The paper

This package provides a plug and play implementation of the Pipeline Pattern. It’s an architectural pattern which encapsulates sequential processes. When used, it allows you to mix and match operation, and pipelines, to create new execution chains. The pipeline pattern is often compared to a production line, where each stage performs a certain operation on a given payload/subject. Stages can act on, manipulate, decorate, or even replace the payload.

If you find yourself passing results from one function to another to complete a series of tasks on a given subject, you might want to convert it into a pipeline.

pipeline.thephpleague.com/

Installing a plug-in

composer require league/pipeline
Copy the code

Write a demo

use League\Pipeline\Pipeline;

// Create two closure functions
$pipe1 = function ($payload) {
    return $payload + 1;
};

$pipe2 = function ($payload) {
    return $payload * 3;
};

$route->map(
    'GET'.'/demo'.function (ServerRequestInterface $request, ResponseInterface $response ) use ($service, $pipe1, $pipe2) {
        $params = $request->getQueryParams();

        // Normal use
        $pipeline1 = (new Pipeline)
            ->pipe($pipe1)
            ->pipe($pipe2);

        $callback1 = $pipeline1->process($params['data']);

        $response->getBody()->write("

Normal use

"
); $response->getBody()->write($callback1

); // Use the magic method $pipeline2 = (new Pipeline()) ->pipe($pipe1) ->pipe($pipe2); $callback2 = $pipeline2($params['data']); $response->getBody()->write("< H1 > Use magic method "); $response->getBody()->write($callback2

); / / use the Builder $builder = new PipelineBuilder(); $pipeline3 = $builder ->add($pipe1) ->add($pipe2) ->build(); $callback3 = $pipeline3($params['data']); $response->getBody()->write("< h1 > using the Builder < / h1 >"); $response->getBody()->write($callback3

); return$response; });Copy the code

The results

Interpreting source code

The entire plugin consists of these files:

PipelineInterface


      
declare(strict_types=1);

namespace League\Pipeline;

interface PipelineInterface extends StageInterface
{
    /**
     * Create a new pipeline with an appended stage.
     *
     * @return static
     */
    public function pipe(callable $operation): PipelineInterface;
}

interface StageInterface
{
    /**
     * Process the payload.
     *
     * @param mixed $payload
     *
     * @return mixed
     */
    public function __invoke($payload);
}
Copy the code

The interface takes the idea of chained programming, adding pipes over and over, and then adding a magic method to make the incoming parameters work.

Let’s take a look at this magic method in action:

mixed __invoke ([ $… ] When you try to call an object as a function, the __invoke() method is automatically called.

Reference: php.net/manual/zh/l… Such as:


      
class CallableClass 
{
    function __invoke($x) {
        var_dump($x);
    }
}
$obj = new CallableClass;
$obj(5);
var_dump(is_callable($obj));
? >
Copy the code

Return result:

int(5)
bool(true)
Copy the code

Pipeline


      
declare(strict_types=1);

namespace League\Pipeline;

class Pipeline implements PipelineInterface
{
    / * * *@var callable[]
     */
    private $stages = [];

    / * * *@var ProcessorInterface
     */
    private $processor;

    public function __construct(ProcessorInterface $processor = null, callable ... $stages)
    {
        $this->processor = $processor ?? new FingersCrossedProcessor;
        $this->stages = $stages;
    }

    public function pipe(callable $stage): PipelineInterface
    {
        $pipeline = clone $this;
        $pipeline->stages[] = $stage;

        return $pipeline;
    }

    public function process($payload)
    {
        return $this->processor->process($payload, ...$this->stages);
    }

    public function __invoke($payload)
    {
        return $this->process($payload); }}Copy the code

The role of core Pipeline is mainly two:

  1. Add and assemble each pipe “pipe”;
  2. After the assembly, the water flows, executes process($payload), and outputs the result.

Processor

After connecting all kinds of pipes, it is necessary to “divert water into the canal”. The plug-in provides two basic execution classes that are relatively simple and can be understood directly from the code.

// Smoothly follow the $stages array to perform the pipe method and pass the result to the next pipe to make the "water" flow one layer at a time
class FingersCrossedProcessor implements ProcessorInterface
{
    public function process($payload, callable ... $stages)
    {
        foreach ($stages as $stage) {
            $payload = $stage($payload);
        }

        return$payload; }}// Add an extra "filter", check the result after each pipe, once satisfied, stop, direct output result.
class InterruptibleProcessor implements ProcessorInterface
{
    / * * *@var callable
     */
    private $check;

    public function __construct(callable $check)
    {
        $this->check = $check;
    }

    public function process($payload, callable ... $stages)
    {
        $check = $this->check;

        foreach ($stages as $stage) {
            $payload = $stage($payload);

            if (true! == $check($payload)) {return$payload; }}return$payload; }}interface ProcessorInterface
{
    /**
     * Process the payload using multiple stages.
     *
     * @param mixed $payload
     *
     * @return mixed
     */
    public function process($payload, callable ... $stages);
}
Copy the code

We can also implement our approach to assembling pipes and filters using this interface.

PipelineBuilder

Finally, a Builder is provided, which is also easy to understand:

class PipelineBuilder implements PipelineBuilderInterface
{
    / * * *@var callable[]
     */
    private $stages = [];

    / * * *@return self
     */
    public function add(callable $stage): PipelineBuilderInterface
    {
        $this->stages[] = $stage;

        return $this;
    }

    public function build(ProcessorInterface $processor = null): PipelineInterface
    {
        return new Pipeline($processor, ...$this->stages); }}interface PipelineBuilderInterface
{
    /**
     * Add an stage.
     *
     * @return self
     */
    public function add(callable $stage): PipelineBuilderInterface;

    /** * Build a new Pipeline object. */
    public function build(ProcessorInterface $processor = null): PipelineInterface;
}
Copy the code

conclusion

Whether it’s a horizontal understanding of different technologies, or based on Laravel or some open source plug-in, we can learn general principles and methods on top of the technology. These principles and methods are then applied to our actual code development.

Recently, I have nothing to do. I refer to Laravel to write a simple framework, and also introduce League\Pipeline into the framework.

“To be continued”