preface

In some scenarios, we encounter high-volume data requests with high frequency and cpu-intensive operations. To avoid too many requests resulting in insufficient resources. The number of parallel requests needs to be kept constant.

plan

  • The promise.race-based feature works with promise.all to limit the number of parallel requests and keep the number of requests.
  • Based on the first-in-first-out (FIFO) feature of the queue, the asynchronous microtask queue is constructed with Promise to limit the number of parallel requests and keep the number of requests.

Realize asynchronous concurrency limitation based on promise. race and promise. all

usingJs event loop mechanism, to perform asynchronous processing, the following are the steps:

  1. Create an asynchronous function that takes three parameters (limit number, data array, handler function);
  2. The processing logic is executed inside the function;
  3. Initialize the result array, run the execution array variable;
  4. Loop data array, the wrap handler function isPromise object;
  5. addPromise objectTo the result array (afterPromise objectThe execution result is still stored in the result array);
  6. Determine whether the length of the data array is less than or equal to the limit;
    1. If it is to continue limited execution;
    2. In the currentPromise objectThen add and delete its own processing logic (clean up the execution array);
    3. Will the currentPromise objectAdd to the execution array;
    4. Determine whether the execution array length is greater than or equal to the limit;
      1. If true, it runsPromise.race(Execute array), enter the microtask queue;
      2. If false, skip;
    5. If not, skip;
  7. usePromise.allFully execute the result array and return the result;
  8. Return run results;

Implementation code:

const asyncPool = async (poolLimit, array, iteratorFn) => {
  const resultList = [];
  const executing = [];
  for (const item of array) {
    console.log("Loop", item);
    const p = Promise.resolve().then(() = > {
     console.log("Initialize", item);
      return iteratorFn(item, array);
    });
    resultList.push(p);
    if (poolLimit <= array.length) {
      const e = p.then(() = > {
        return executing.splice(executing.indexOf(e), 1);
      });
      executing.push(e);
      if (executing.length >= poolLimit) {
       console.log("Running Promise. Race");
        await Promise.race(executing); }}}return Promise.all(resultList);
};
Copy the code

Example:

const timeout = (i) = >
  new Promise( (resolve) = > {
    setTimeout(resolve, i, i)
  });

const main = async() = > {const aa = await asyncPool(
    3[10.20.30.40.50.60.60.70.80.1000,],
    timeout
  );
  console.log("aa=>", aa);
};

main();
Copy the code

Running results:

As the result of the run shows, the function runs, remaining at the limit of 3 runs.

Implement asynchronous concurrency limits based on queues and promises

The basic logic is that there is a task pool, and the number of asynchronous concurrency is set by initializing the task pool. By adding asynchronous tasks to the task pool, the asynchronous tasks are collected into an array and passed to Promise.all by taking advantage of the Promise. Executes the function and returns the result.

The flow chart is as follows:

Implementation code:

/ / the queue
class Queue {
  constructor() {
    this._queue = [];
  }
  push(value) {
    return this._queue.push(value);
  }
  shift() {
    // TODO optimizes shift to O(n). Reverse + POP is used to introduce the design of even groups and reduce the time complexity of class O(1).
    return this._queue.shift();
  }
  isEmpty() {
    return this._queue.length === 0; }}// Delay the task
class DelayedTask {
  constructor(resolve, fn, args) {
    this.resolve = resolve;
    this.fn = fn;
    this.args = args; }}/ / task pools
class TaskPool {
  constructor(size) {
    this.size = size;
    this.queue = new Queue();
  }

  addTask(fn) {
    return (. args) = > {
      return new Promise((resolve) = > {
        this.queue.push(new DelayedTask(resolve, fn, args));
        if (this.size) {
          this.size--;
          const {
            resolve: taskResolve,
            fn: taskFn,
            args: taskArgs,
          } = this.queue.shift();
          taskResolve(this.runTask(taskFn, taskArgs)); }}); }; }pullTask() {
    if (this.queue.isEmpty()) {
      return;
    }
    if (this.size === 0) {
      return;
    }
    this.size--;
    const { resolve, fn, args } = this.queue.shift();
    resolve(this.runTask(fn, args));
  }

  runTask(fn, args) {
    const result = Promise.resolve(fn(... args)); result.finally(() = > {
      this.size++;
      this.pullTask();
    });
    returnresult; }}Copy the code

Example:

const cc = new TaskPool(4);
const taskList = [1000.3000.200.1300.800.2000];

const task = (timeout) = >
  new Promise((resolve) = >
    setTimeout(() = > {
      resolve(timeout);
    }, timeout)
  );

async function startConcurrentControl() {
  console.time("xxx");
  await Promise.all(taskList.map(cc.addTask(task)));
  console.timeEnd("xxx");
}
startConcurrentControl();
Copy the code

reference

Use JavaScript for concurrency control

async-pool