This post was originally posted on my blog

Suppose we need to write a download tool or a crawler.

We make multiple requests as long as speed is maintained. But most websites have a limit on the number of requests they can make. To avoid triggering site restrictions, we need to control the number of concurrent requests.

So how do you control concurrent requests?

Use third-party libraries

Existing libraries have methods for this asynchronous control scenario. Examples are RxJS and Async

let observables = [] // fill with observables
from(observables)
    .pipe(mergeMap(observable= > observable, 3))
    .subscribe();
Copy the code

The above example uses the RxJS operator mergeMap

RxJS – mergeMap

The second parameter to mergeMap controls the number of concurrent Observables.

Async. EachLimit can also be done, so I won’t go into details here.

async – Documentation

Implement concurrent request control

Without further ado, let’s start implementing one of these functions.

// iteratorFn is a function that returns promises
function eachLimit(limit, arr, iteratorFn) {
  const res = []
  for (const item of arr) {
    res.push(iteratorFn(item))
  }
  return Promise.all(res)
}

Copy the code

The above code, without any restrictions. Make as many requests as you can.

The first thing that comes to mind is to just put the object that we’re calling asynchronously into a queue. The length of the queue limits the size of the request: limit

function eachLimit(limit, arr, iteratorFn) {
  const res = [];
  const activeList = [];
  for (const item of arr) {
    const p = iteratorFn(item);
    res.push(p);
    activeList.push(p)
    while (activeList.length >= limit) {
      // the request is restricted}}return Promise.all(res);
}

Copy the code

Control of queues

With this queue in place, the next step is to update the queue. The queue is full of Promise objects, so how do you tell if the state is completed, find that completed Promise object, and kick it out of the queue so that new ones can join?

The answer is Promise. Race

But there’s also the problem that promise. race returns the fullfilled value of the Promise instance we put in the queue.

async function eachLimit(limit, arr, iteratorFn) {
  const res = [];
  const activeList = [];
  for (const item of arr) {
    const p = iteratorFn(item);
    res.push(p);
    activeList.push(p)
    while (activeList.length >= limit) {
      const fastestResult = await Promise.race(activeList)
    }
  }
  return Promise.all(res);
}
Copy the code

How do you tell which Promise instance is completed using fastestResult?

Er… Actually, I don’t know, and then I found the instructions below.

The “remove from queue” step should happen by the completed promise itself (using then) instead of relying on the returned promise from Promise.race. It seems this is the only way around it.

By looking up the data, you can only handle it with the Promise completion state, according to stackOverflow.

async function eachLimit(limit, arr, iteratorFn) {
  const res = [];
  const activeList = [];
  for (const item of arr) {
    const p = iteratorFn(item);
    res.push(p);
    const e = p.then((a)= >
      activeList.splice(activeList.indexOf(e), 1)); activeList.push(e)while (activeList.length >= limit) {
      await Promise.race(activeList)
    }
  }
  return Promise.all(res);
}
Copy the code

The complete implementation is shown above

The test code

async function test() {
  const timeout = i= > new Promise(resolve= > setTimeout((a)= > resolve(i), i));
  const results = await eachLimit(2[1000.5000.3000.2000], timeout);
  console.log(results);
}
Copy the code

Another way to think about it

The above method is controlled by a single queue. The next idea is to use a sequence of limit sizes for processing.

Suppose there are five requests to be executed: A, B, C, D, and E. Three are required to be executed concurrently.

[A then D], [B then E] [C]

async function asyncLoop(limit, arr, iteratorFn) {
  const queues = new Array(limit).fill(0).map((a)= > Promise.resolve());
  let index = 0;
  const add = cb= > {
    index = (index + 1) % limit;
    return queues[index] = queues[index].then((a)= > cb());
  };
  let results = [];
  for (let v of arr) {
    results.push(add((a)= > iteratorFn(v)));
  }
  return await Promise.all(results);
};
Copy the code

conclusion

The first approach is to use promise.race and wrap the original Promise instance into a new Promise instance, then process the queue, that is, remove the completed self from the queue.

The second method uses N sequences, each of which is a chained call to promise.then.

Refer to the link

rxaviers/async-pool: Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7

javascript – Get which promise completed in Promise.race – Stack Overflow