This post was originally posted on my blog
Suppose we need to write a download tool or a crawler.
We make multiple requests as long as speed is maintained. But most websites have a limit on the number of requests they can make. To avoid triggering site restrictions, we need to control the number of concurrent requests.
So how do you control concurrent requests?
Use third-party libraries
Existing libraries have methods for this asynchronous control scenario. Examples are RxJS and Async
let observables = [] // fill with observables
from(observables)
.pipe(mergeMap(observable= > observable, 3))
.subscribe();
Copy the code
The above example uses the RxJS operator mergeMap
RxJS – mergeMap
The second parameter to mergeMap controls the number of concurrent Observables.
Async. EachLimit can also be done, so I won’t go into details here.
async – Documentation
Implement concurrent request control
Without further ado, let’s start implementing one of these functions.
// iteratorFn is a function that returns promises
function eachLimit(limit, arr, iteratorFn) {
const res = []
for (const item of arr) {
res.push(iteratorFn(item))
}
return Promise.all(res)
}
Copy the code
The above code, without any restrictions. Make as many requests as you can.
The first thing that comes to mind is to just put the object that we’re calling asynchronously into a queue. The length of the queue limits the size of the request: limit
function eachLimit(limit, arr, iteratorFn) {
const res = [];
const activeList = [];
for (const item of arr) {
const p = iteratorFn(item);
res.push(p);
activeList.push(p)
while (activeList.length >= limit) {
// the request is restricted}}return Promise.all(res);
}
Copy the code
Control of queues
With this queue in place, the next step is to update the queue. The queue is full of Promise objects, so how do you tell if the state is completed, find that completed Promise object, and kick it out of the queue so that new ones can join?
The answer is Promise. Race
But there’s also the problem that promise. race returns the fullfilled value of the Promise instance we put in the queue.
async function eachLimit(limit, arr, iteratorFn) {
const res = [];
const activeList = [];
for (const item of arr) {
const p = iteratorFn(item);
res.push(p);
activeList.push(p)
while (activeList.length >= limit) {
const fastestResult = await Promise.race(activeList)
}
}
return Promise.all(res);
}
Copy the code
How do you tell which Promise instance is completed using fastestResult?
Er… Actually, I don’t know, and then I found the instructions below.
The “remove from queue” step should happen by the completed promise itself (using then) instead of relying on the returned promise from Promise.race. It seems this is the only way around it.
By looking up the data, you can only handle it with the Promise completion state, according to stackOverflow.
async function eachLimit(limit, arr, iteratorFn) {
const res = [];
const activeList = [];
for (const item of arr) {
const p = iteratorFn(item);
res.push(p);
const e = p.then((a)= >
activeList.splice(activeList.indexOf(e), 1)); activeList.push(e)while (activeList.length >= limit) {
await Promise.race(activeList)
}
}
return Promise.all(res);
}
Copy the code
The complete implementation is shown above
The test code
async function test() {
const timeout = i= > new Promise(resolve= > setTimeout((a)= > resolve(i), i));
const results = await eachLimit(2[1000.5000.3000.2000], timeout);
console.log(results);
}
Copy the code
Another way to think about it
The above method is controlled by a single queue. The next idea is to use a sequence of limit sizes for processing.
Suppose there are five requests to be executed: A, B, C, D, and E. Three are required to be executed concurrently.
[A then D], [B then E] [C]
async function asyncLoop(limit, arr, iteratorFn) {
const queues = new Array(limit).fill(0).map((a)= > Promise.resolve());
let index = 0;
const add = cb= > {
index = (index + 1) % limit;
return queues[index] = queues[index].then((a)= > cb());
};
let results = [];
for (let v of arr) {
results.push(add((a)= > iteratorFn(v)));
}
return await Promise.all(results);
};
Copy the code
conclusion
The first approach is to use promise.race and wrap the original Promise instance into a new Promise instance, then process the queue, that is, remove the completed self from the queue.
The second method uses N sequences, each of which is a chained call to promise.then.
Refer to the link
rxaviers/async-pool: Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7
javascript – Get which promise completed in Promise.race – Stack Overflow