One, foreword

During development, the need to control the number of concurrent tasks is sometimes encountered.

A crawler, for example, can reduce the frequency of requests by limiting the number of concurrent tasks, thus avoiding the problem of requests being blocked too often.

Next, this article describes how to implement a concurrency controller.

Second, the sample

  const task = timeout= > new Promise((resolve) = > setTimeout((a)= > {

    resolve(timeout);

  }, timeout))



  const taskList = [1000.3000.200.1300.800.2000];



  async function startNoConcurrentControl({

    console.time(NO_CONCURRENT_CONTROL_LOG);

    await Promise.all(taskList.map(item= > task(item)));

    console.timeEnd(NO_CONCURRENT_CONTROL_LOG);

  }



  startNoConcurrentControl();

Copy the code

The sample code above uses the promise.all method to simulate a scenario where six tasks execute concurrently, with a total time of 3000 milliseconds to complete all tasks.

This example will be used below to verify the correctness of the implementation method.

Three, implementation,

Since the number of concurrent tasks is limited, a data structure is needed to manage the continuously generated tasks.

The “first in, first out” feature of the queue ensures the order of concurrent execution of tasks. In JavaScript, we can simulate queues by “arrays” :

  class Queue {

    constructor() {

      this._queue = [];

    }



    push(value) {

      return this._queue.push(value);

    }



    shift() {

      return this._queue.shift();

    }



    isEmpty() {

      return this._queue.length === 0;

    }

  }

Copy the code

For each task, you need to manage its execution functions and parameters:

  class DelayedTask {

    constructor(resolve, fn, args) {

      this.resolve = resolve;

      this.fn = fn;

      this.args = args;

    }

  }

Copy the code

Next, implement the core TaskPool class, which is used to control the execution of tasks:

  class TaskPool {

    constructor(size) {

      this.size = size;

      this.queue = new Queue();

    }



    addTask(fn, args) {

      return new Promise((resolve) = > {

        this.queue.push(new DelayedTask(resolve, fn, args));

        if (this.size) {

          this.size--;

          const { resolve: taskResole, fn, args } = this.queue.shift();

          taskResole(this.runTask(fn, args));

        }

      })

    }



    pullTask() {

      if (this.queue.isEmpty()) {

        return;

      }



      if (this.size === 0) {

        return;

      }



      this.size++;

      const { resolve, fn, args } = this.queue.shift();

      resolve(this.runTask(fn, args));

    }



    runTask(fn, args) {

      const result = Promise.resolve(fn(... args));



      result.then((a)= > {

        this.size--;

        this.pullTask();

      }).catch((a)= > {

        this.size--;

        this.pullTask();

      })



      return result;

    }

  }

Copy the code

TaskPool contains three key methods:

  • AddTask: Puts a new task into the queue and triggers the task pool status detection. If the current task pool is not fully loaded, the task is removed from the queue and put into the task pool for execution.
  • RunTask: Executes the current task. After the task is completed, the status of the task pool is updated and new tasks are proactively pulled.
  • PullTask: If the current queue is not empty and the task pool is not full, the task in the queue is proactively fetched and executed.

Next, limit the concurrency of the previous example to two:

  const cc = new ConcurrentControl(2);



  async function startConcurrentControl({

    console.time(CONCURRENT_CONTROL_LOG);

    await Promise.all(taskList.map(item= > cc.addTask(task, [item])))

    console.timeEnd(CONCURRENT_CONTROL_LOG);

  }



  startConcurrentControl();

Copy the code

The execution process is as follows:

The total execution time of the final task was 5000 milliseconds.

Fourth, the higher order function optimization parameter transfer

  await Promise.all(taskList.map(item= > cc.addTask(task, [item])))

Copy the code

It is very tedious to manually pass the parameters of each task. Here we can use “high-order functions to realize automatic transparent transmission of parameters” :

  addTask(fn) {

    return (. args) = > {

      return new Promise((resolve) = > {

        this.queue.push(new DelayedTask(resolve, fn, args));



        if (this.size) {

          this.size--;

          const { resolve: taskResole, fn: taskFn, args: taskArgs } = this.queue.shift();

          taskResole(this.runTask(taskFn, taskArgs));

        }

      })

    }

  }

Copy the code

The code is much cleaner:

  await Promise.all(taskList.map(cc.addTask(task)))

Copy the code

Fifth, optimize the team out operation

Arrays are generally stored in a block of “contiguous memory”. When the shift method is called, the header element is first removed (O(1)), and the remaining elements need to be moved to the left (O(n)), so the shift operation is O(n).

Due to the nature of the JavaScript language, V8 implements JSArray with a spatial and temporal tradeoff, switching between FixedArray and HashTable modes in different scenarios.

In hashTable mode, the shift operation eliminates the time complexity of the left shift, which can be reduced to O(1). Even so, shift is still a time-consuming operation.

If the array has a large number of elements and the shift operation is required frequently, you can use the Reverse + pop mode to optimize the array.

  const Benchmark = require('benchmark');

  const suite = new Benchmark.Suite;



  suite.add('shift'.function({

    let count = 10;

    const arr = generateArray(count);

    while (count--) {

      arr.shift();

    }

  })

  .add('reverse + pop'.function({

    let count = 10;

    const arr = generateArray(count);

    arr.reverse();

    while (count--) {

      arr.pop();

    }

  })

  .on('cycle'.function(event{

    console.log(String(event.target));

  })

  .on('complete'.function({

    console.log('Fastest is ' + this.filter('fastest').map('name'));

    console.log('\n')

  })

  .run({

    asynctrue

  })

Copy the code

With benchmark data run by benchmark.js, it’s easy to see which approach is more efficient:

Recalling the previous implementation of the Queue class, since there is only one array to store tasks, using reverse + POP directly affects the order in which tasks are executed.

Here we need to introduce the design of double arrays, one array for the in-queue operation, one array for the out-of-queue operation.

  class HighPerformanceQueue {

    constructor() {

      this.q1 = []; // For push data

      this.q2 = []; // For shift data

    }



    push(value) {

      return this.q1.push(value);

    }



    shift() {

      let q2 = this.q2;

      if (q2.length === 0) {

        const q1 = this.q1;

        if (q1.length === 0) {

          return;

        }

        this.q1 = q2; // Thank you @shaonialife for correcting me

        q2 = this.q2 = q1.reverse();

      }



      return q2.pop();

    }



    isEmpty() {

      if (this.q1.length === 0 && this.q2.length === 0) {

        return true;

      }

      return false;

    }

  }

Copy the code

Finally, the optimization effect was verified by benchmarking:

Write in the last

Finally, “if this article is helpful to you, welcome to pay attention to (public number [digitise big front]), like, forward ε=ε= idolidolat (foremost ロ idolidolat).”