JS naturally supports parallel requests, but at the same time it will bring some problems, such as the target server pressure is too large, so this paper introduces “request scheduler” to control the concurrency.

TLDR; Jump directly to the “Abstraction and reuse” section.

To obtain a batch of non-dependent resources, it is usually possible to execute concurrently with promise. all(arrayOfPromises) for performance reasons. For example, if we have 100 application ids and the requirement is to aggregate the PV of all applications, we usually write:

const ids = [1001.1002.1003.1004.1005];
const urlPrefix = 'http://opensearch.example.com/api/apps';

// The fetch function sends the HTTP request and returns a Promise
const appPromises = ids.map(id= > `${urlPrefix}/${id}`).map(fetch);

Promise.all(appPromises)
  // Add through reduce
  .then(apps= > apps.reduce((initial, current) = > initial + current.pv, 0))
  .catch((error) = > console.log(error));
Copy the code

The above code works fine with a small number of applications. When the number of applications reaches tens of thousands, for a system that does not support concurrency very well, your “stress test” will cause the third server to crash, temporarily unable to respond to requests:

<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>Nginx / 1.10.1</center>
</body>
</html>
Copy the code

How to solve it?

A natural idea would be to break up the number of concurrent requests into chunks, each of which would still be concurrent, but the chunkSize would be limited to the maximum number of concurrent requests supported by the system. This means that requests within chunks are concurrent, but requests between chunks are serial. The idea is actually very simple, but it is difficult to write. Three operations are summed up: block, serial and aggregate

The difficulty is how to implement promises serially. Promises only provide parallel (promise.all) functionality, but not serial functionality. We start with three simple requests to see how to implement, heuristic solutions.

Task1, task2, task3 are three factory functions that return promises to simulate our asynchronous request
const task1 = () = > new Promise((resolve) = > {
  setTimeout(() = > {
    resolve(1);
    console.log('task1 executed');
  }, 1000);
});

const task2 = () = > new Promise((resolve) = > {
  setTimeout(() = > {
    resolve(2);
    console.log('task2 executed');
  }, 1000);
});

const task3 = () = > new Promise((resolve) = > {
  setTimeout(() = > {
    resolve(3);
    console.log('task3 executed');
  }, 1000);
});

// Aggregate result
let result = 0;

const resultPromise = [task1, task2, task3].reduce((current, next) = > 	  
  current.then((number) = > {
    console.log('resolved with number', number); // task2, task3 Promise will be resolved here
    result += number;

    return next();
  }),
  
  Promise.resolve(0)) // Aggregate the initial value

  .then(function(last) {
    console.log('The last promise resolved with number', last); // The Promise of task3 is resolved here

    result += last;

    console.log('all executed with result', result);

    return Promise.resolve(result);
  });
Copy the code

Running results are shown in Figure 1:

Code parsing: we want to, the effect of visual display is fn1 (). Then (() = > fn2 ()), then (() = > fn3 ()). The key to getting a set of promises to execute sequentially in the code above is the reduce “engine” that drives the Promise factory function step by step.

With that out of the way, let’s look at the final code:

/** * emulates an HTTP request *@param  {String} url 
 * @return {Promise}* /
function fetch(url) {
  console.log(`Fetching ${url}`);
  return new Promise((resolve) = > {
    setTimeout(() = > resolve({ pv: Number(url.match(/\d+$/))}),2000);
  });
}

const urlPrefix = 'http://opensearch.example.com/api/apps';

const aggregator = {
  /** * entry method to enable scheduled task **@return {Promise}* /
  start() {
    return this.fetchAppIds()
      .then(ids= > this.fetchAppsSerially(ids, 2))
      .then(apps= > this.sumPv(apps))
      .catch(error= > console.error(error));
  },
  
  /** * Get all application ids **@private
   * 
   * @return {Promise}* /
  fetchAppIds() {
    return Promise.resolve([1001.1002.1003.1004.1005]);
  },

  promiseFactory(ids) {
    return () = > Promise.all(ids.map(id= > `${urlPrefix}/${id}`).map(fetch));
  },
  
  /** * Concurrency request 'concurrency' for all applications ** Concurrency request 'concurrency' for each application, called a chunk ** The first 'chunk' concurrency is complete before the next one continues until all applications are completed **@private
   *
   * @param  {[Number]} ids
   * @param  {Number} Concurrency Number of concurrent requests *@return {[Object]}         Information for all applications */
  fetchAppsSerially(ids, concurrency = 100) {
    / / block
    let chunkOfIds = ids.splice(0, concurrency);
    const tasks = [];
    
    while(chunkOfIds.length ! = =0) {
      tasks.push(this.promiseFactory(chunkOfIds));
      chunkOfIds = ids.splice(0, concurrency);
    }
    
    // execute in block order
    const result = [];
    return tasks.reduce((current, next) = > current.then((chunkOfApps) = > {
      console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n'); result.push(... chunkOfApps);// Tap the array
      return next();
    }), Promise.resolve([]))
    .then((lastchunkOfApps) = > {
      console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n'); result.push(... lastchunkOfApps);// Flatten it again
      console.info('All chunks has been executed with result', result);
      return result;
    });
  },
  
  /** * Aggregate PV ** for all applications@private
   * 
   * @param  {} [] apps 
   * @return {[type]}      [description]
   */
  sumPv(apps) {
    const initial = { pv: 0 };

    return apps.reduce((accumulator, app) = > ({ pv: accumulator.pv + app.pv }), initial); }};// Start running
aggregator.start().then(console.log);
Copy the code

Running results are shown in Figure 2:

Abstraction and reuse

The purpose is achieved, because of the universality, the following start to abstract into a pattern for reuse.

serial

Start by simulating an HTTP GET request.

/**
 * mocked http get.
 * @param {string} url
 * @returns {{ url: string; delay: number; }} * /
function httpGet(url) {
  const delay = Math.random() * 1000;

  console.info('GET', url);

  return new Promise((resolve) = > {
    setTimeout(() = > {
      resolve({
        url,
        delay,
        at: Date.now() }) }, delay); })}Copy the code

Execute a batch of requests sequentially.

const ids = [1.2.3.4.5.6.7];

Delay (); delay (); delay (); delay (); delay ()
const httpGetters = ids.map(id= > 
  () = > httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`));// execute sequentially
const tasks = await httpGetters.reduce((acc, cur) = > {
  return acc.then(cur);
  
  // Short form, equivalent to
  // return acc.then(() => cur());
}, Promise.resolve());

tasks.then(() = > {
  console.log('done');
});
Copy the code

Notice the console output, which should print the following sequentially:

GET https://jsonplaceholder.typicode.com/posts/1
GET https://jsonplaceholder.typicode.com/posts/2
GET https://jsonplaceholder.typicode.com/posts/3
GET https://jsonplaceholder.typicode.com/posts/4
GET https://jsonplaceholder.typicode.com/posts/5
GET https://jsonplaceholder.typicode.com/posts/6
GET https://jsonplaceholder.typicode.com/posts/7
Copy the code

Segment serial, segment parallel

Here’s the point. The request scheduler implementation for this article:

/**
 * Schedule promises.
 * @param {Array<(... arg: any[]) => Promise
        
         >}
         factories 
 * @param {number} concurrency 
 */
function schedulePromises(factories, concurrency) {
  /**
   * chunk
   * @param {any[]} arr 
   * @param {number} size 
   * @returns {Array<any[]>}* /
  const chunk = (arr, size = 1) = > {
    return arr.reduce((acc, cur, idx) = > {
      const modulo = idx % size;

      if (modulo === 0) {
        acc[acc.length] = [cur];
      } else {
        acc[acc.length - 1].push(cur);
      }

      returnacc; }} []);const chunks = chunk(factories, concurrency);

  let resps = [];

  return chunks.reduce(
    (acc, cur) = > {
      return acc
        .then(() = > {
          console.log(The '-');
          return Promise.all(cur.map(f= > f()));
        })
        .then((intermediateResponses) = >{ resps.push(... intermediateResponses);returnresps; })},Promise.resolve()
  );
}
Copy the code

To test, execute the scheduler:

// segment serial, segment parallel
schedulePromises(httpGetters, 3).then((resps) = > {
  console.log('resps:', resps);
});
Copy the code

Console output:

--- GET https://jsonplaceholder.typicode.com/posts/1 GET https://jsonplaceholder.typicode.com/posts/2 GET https://jsonplaceholder.typicode.com/posts/3 --- GET https://jsonplaceholder.typicode.com/posts/4 GET https://jsonplaceholder.typicode.com/posts/5 GET https://jsonplaceholder.typicode.com/posts/6 --- GET https://jsonplaceholder.typicode.com/posts/7 resps: [ { "url": "https://jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at" : 1615131322163}, {" url ":" https://jsonplaceholder.typicode.com/posts/2 ", "delay" : 594.5056229848931, "at" : 1615131322024}, {" url ":" https://jsonplaceholder.typicode.com/posts/3 ", "delay" : 738.8230109146299, "at" : 1615131322168}, {" url ":" https://jsonplaceholder.typicode.com/posts/4 ", "delay" : 525.4604386109747, "at" : 1615131322698}, {" url ":" https://jsonplaceholder.typicode.com/posts/5 ", "delay" : 29.086379722201183, "at" : 1615131322201}, {" url ":" https://jsonplaceholder.typicode.com/posts/6 ", "delay" : 592.2345027398272, "at" : 1615131322765}, {" url ":" https://jsonplaceholder.typicode.com/posts/7 ", "delay" : 513.04467560949, "at": 1615131323284}]Copy the code

Verification is passed, it is true that three requests are concurrent, waiting for the end of the concurrent request to continue the next three requests, and the result aggregation is correct.

conclusion

  1. If the number of concurrent requests is too large, consider chunking serial, with concurrent requests in the block.
  2. If a problem looks complex, simplify it first, then work out the key points step by step, and then abstract it away to find a solution.
  3. The essence of this article is to usereduceAs serial drivenengine, so mastering it can provide new ideas for solving puzzles we encounter in daily development.reduceSee above for masteryYou finally useReduceThe 🎉.