The original address: www.jianshu.com/p/1ed50e6d4…

Bull is a Node.js task queue management library based on Redis, which supports delay queues, priority tasks, repetitive tasks, and atomic operations.

This article will analyze the source code of Bull from the basic use, not for repeat job,seperate processes and so on.

Bull: Premium Queue package for handling jobs and messages in NodeJS.

The relevant information is as follows:

  • Source code address: github.com/OptimalBits…
  • Branch: develop
  • Last commit: 4f5744a

The basic use

The use of Bull is divided into three steps:

  1. Create a queue
  2. Bind the task handler function
  3. Add tasks

The following is an example:

const Bull = require('bull'Const myFirstQueue = new Bull('my-first-queue'); Myfirstqueue.process (async (job, data) => {return doSomething(data); }); // 3. Add task const job = await myFirstqueue.add ({foo:'bar'
});
Copy the code

Create a queue

Creating a queue is done through require and then new, so find the entry to require first. Open the package. The json:

{
  "name": "bull"."version": "3.7.0"."description": "Job manager"."main": "./index.js". }Copy the code

If you see the entry as index.js, open:

module.exports = require('./lib/queue');
module.exports.Job = require('./lib/job');
Copy the code

/lib/queue:

module.exports = Queue;
Copy the code

Exports Queue, exports Queue, exports Queue, exports Queue

const Queue = functionQueue(name, url, opts) { ... // Default Settings this. Settings = _. Defaults (opts.settings, {lockDuration: 30000, stalledInterval: 30000, maxStalledCount: {}} backoffStrategies: {}});}} backoffStrategies: {}}); . // Bind these methods to avoid constant rebinding and/or creating closures //inprocessJobs etc. this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); this.processJob = this.processJob.bind(this); this.getJobFromId = Job.fromId.bind(null, this); . };Copy the code

This mainly involves initialization of parameters and binding of functions.

Bind the task handler function

This step starts with myfirstqueue.process, which starts with the process function:

Queue.prototype.process = function(name, concurrency, handler) { ... this.setHandler(name, handler); // 1. Bind handlerreturn this._initProcess().then(() => {
    returnthis.start(concurrency); // 2. Start queue}); };Copy the code

This function does two things:

  1. Binding handler
  2. Start the queue

First look at the binding handler:

Queue.prototype.setHandler = function (name, handler) {
  ...
  if (this.handlers[name]) {
    throw new Error('Cannot define the same handler twice '+ name); }...if (typeof handler === 'string') {... }else{ handler = handler.bind(this); // Save the handler and the nameif (handler.length > 1) {
      this.handlers[name] = promisify(handler);
    } else {
      this.handlers[name] = function() {... }}};Copy the code

Now look at the queue startup:

Queue.prototype.start = function (concurrency) {
  return this.run(concurrency).catch(err => {
    this.emit('error', err, 'error running queue');
    throw err;
  });
};
Copy the code

Look at the run function:

Queue.prototype.run = function (concurrency) {
  const promises = [];

  return this.isReady()
    .then(() => {
      returnthis.moveUnlockedJobsToWait(); // Move the unlocked task towaitQueue}).then(() => {return utils.isRedisReady(this.bclient);
    })
    .then(() => {
      while(concurrency--) { promises.push( new Promise(resolve => { this.processJobs(concurrency, resolve); // Process tasks})); } this.startMoveUnlockedJobsToWait(); // Unlocked job Perform a scheduled checkreturn Promise.all(promises);
    });
};
Copy the code

unlocked job(stalled job): Normally, a job obtains locks when it is active (lockDuration and renewtime are available) and releases locks when it is complete. If the job is not locked when it is active, the process is blocked or the lock expires due to a collapse

See processJobs:

Queue.prototype.processJobs = function (index, resolve, job) {
  const processJobs = this.processJobs.bind(this, index, resolve);
  process.nextTick(() => {
    this._processJobOnNextTick(processJobs, index, resolve, job);
  });
};
Copy the code

Then look at _processJobOnNextTick:

// const gettingNextJob = job? Promise.resolve(job) : this.getNextJob();return (this.processing[index] = gettingNextJob
          .then(this.processJob)
          .then(processJobs, err => {
            ...
          }));
Copy the code

The above code can be described as follows:

  1. Job is used when emptygetNextJobFunction to get the job
  2. performprocessJobfunction
  3. performprocessJobsfunction

Look at the getNextJob:

if (this.drained) {
    //
    // Waiting for new jobs to arrive
    //
    console.log('bclient start get new job');
    return this.bclient
      .brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
      .then(
        jobId => {
          if (jobId) {
            returnthis.moveToActive(jobId); } }, err => { ... }); }else {
    return this.moveToActive();
  }
Copy the code

Redis’ PUSH/POP mechanism is used to fetch messages with a drainDelay timeout.

Moving on to processJob:

Queue.prototype.processJob = function (job) {
  ... 
  const handleCompleted = result => {
    return job.moveToCompleted(result).then(jobData => {
      ...
      returnjobData ? this.nextJobFromJobData(jobData[0], jobData[1]) : null; }); }; // Extend the lock time lockExtender(); const handler = this.handlers[job.name] || this.handlers[The '*'];

  if(! handler) { ... }else {
    letjobPromise = handler(job); .returnjobPromise .then(handleCompleted) .catch(handleFailed) .finally(() => { stopTimer(); }); }};Copy the code

You can see that after the task is successfully processed, handleCompleted is called, which calls moveToCompleted for the job, and there are calls in between that call moveToFinished for the Lua script:

. -- Try to get next job to avoid an extra roundtripif the queue is not closing, 
  -- and not rate limited.
  ...
Copy the code

The script moves the job to the completed or Failed queue and then removes the next job.

ProcessJobs is repeated after processJob is finished, which is the core of the loop, as shown below:

Add tasks

Look directly at the add function:

Queue.prototype.add = function (name, data, opts) {
  ...
  if (opts.repeat) {
    ...
  } else {
    returnJob.create(this, name, data, opts); }};Copy the code

The create function in Job is called:

Job.create = function(queue, name, data, opts) { const job = new Job(queue, name, data, opts); 1. Create a jobreturn queue
    .isReady()
    .then(() => {
      returnaddJob(queue, job); // 2. Add job to queue})... };Copy the code

Continuing along addJob, you end up calling addJob of the Lua script, which stores the job to Redis according to the job Settings.

The problem

1. Why the error occurs:job stalled more than allowable limit

Performs a function in the run function enclosing startMoveUnlockedJobsToWait (), take a look at this function:

Queue.prototype.startMoveUnlockedJobsToWait = function () {
  clearInterval(this.moveUnlockedJobsToWaitInterval);
  if(this.settings.stalledInterval > 0 && ! this.closing) { this.moveUnlockedJobsToWaitInterval =setInterval( this.moveUnlockedJobsToWait, this.settings.stalledInterval ); }};Copy the code

MoveUnlockedJobsToWait (); moveUnlockedJobsToWait ();

Queue.prototype.moveUnlockedJobsToWait = function() {...return scripts
    .moveUnlockedJobsToWait(this)
    .then(([failed, stalled]) => {
      const handleFailedJobs = failed.map(jobId => {
        return this.getJobFromId(jobId).then(job => {
          this.emit(
            'failed',
            job,
            new Error('job stalled more than allowable limit'),
            'active'
          );
          returnnull; }); }); . })... ; };Copy the code

MoveUnlockedJobsToWait this function will eventually call the lua script moveUnlockedJobsToWait via scripts moveUnlockedJobsToWait:

.local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])
  ...
        if(stalledCount > MAX_STALLED_JOB_COUNT) then
          rcall("ZADD", KEYS[4], ARGV[3], jobId)
          rcall("HSET", jobKey, "failedReason"."job stalled more than allowable limit")
          table.insert(failed, jobId)
        else
          -- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
          rcall("RPUSH", dst, jobId)
          rcall('PUBLISH', KEYS[1] .. The '@', jobId)
          table.insert(stalled, jobId)
        end
  ...
return {failed, stalled}
Copy the code
  • MAX_STALLED_JOB_COUNT: The default value is 1

The script took out and returned jobs that had cost it, generating titlelike errors.

reference

  • OptimalBits/bull
  • Bull’s Guide