The historical change
RQ(Redis Queue) is a very compact, lightweight Python library for processing background task queues. Its biggest advantage over Celery is that it is easy to use. RQ uses the underlying data associated with Redis queues, Redis is the only broker. The first version of RQ 0.1.0 was completed in 2013, and the latest version is 1.8.0. The latest version will be released in 2021 at github.com/rq/rq. This source code analysis is version 0.1.0. RQ components are as follows:
- rq:github.com/rq/rq
- Rq-scheduler Scheduling: github.com/rq/rq-sched…
- Rq-dashboard Console: github.com/Parallels/r…
Core components and code structure
- The core components of RQ include Queue, Job and Worker, and the storage engine uses Redis database.
- The code structure is as follows:
├ ─ ─ just set py ├ ─ ─ connections. Py ├ ─ ─ dummy. Py ├ ─ ─ exceptions. Py ├ ─ ─ job. Py ├ ─ ─ queue. Py ├ ─ ─ timeouts. Py ├ ─ ─ utils. Py ├ ─ ─ version. Py └ ─ ─ worker. PyCopy the code
Py, queue.py and worker.py are basically the core logic, which involves job enqueueing, generation and persistence, while worker executes job logic.
The target
- Familiar with the basic rQ process, from task entry to final processing logic.
- Understanding RQ involves Linux calls, such as processes and signal processing.
- Understanding RQ involves all redis call operations and implementation.
- Learning RQ involves high-level calls and elegant implementations of Python.
- Implement a simple version of the task queue system.
The basic structure
- Task team
## Queue def enqueue(self, f, *args, **kwargs): """Creates a job to represent the delayed function call and enqueues it. Expects the function to call, along with the arguments and keyword arguments. The special keyword `timeout` is reserved for `enqueue()` itself and it won't be passed to the actual job function. """ if f.__module__ == '__main__': raise ValueError( 'Functions from the __main__ module cannot be processed ' 'by workers.') timeout = kwargs.pop('timeout', None) job = Job.create(f, *args, connection=self.connection, **kwargs) return self.enqueue_job(job, timeout=timeout) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. When the `timeout` argument is sent, it will overrides the default timeout value of 180 seconds. `timeout` may either be a string or integer. If the `set_meta_data` argument is `True` (default), it will update the properties `origin` and `enqueued_at`. """ if set_meta_data: job.origin = self.name job.enqueued_at = times.now() if timeout: job.timeout = timeout # _timeout_in_seconds(timeout) else: Job.timeout = 180 # default job.save() # hmset job information self.push_job_id(job.id) # rpush jobid to queue associated list return jobCopy the code
Provide method name + parameter, do not support main module defined function enqueue, asynchronous call finally transform a job, the job is stored in redis hash structure, then jobid rpush to queue associated list.
- Perform a task
## Worker def work(self, burst=False): # noqa """Starts the work loop. Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ self._install_signal_handlers() did_perform_work = False self.register_birth() self.state = 'starting' try: while True: if self.stopped: self.log.info('Stopping on request.') break self.state = 'idle' qnames = self.queue_names() self.procline('Listening on %s' % ','.join(qnames)) self.log.info('') self.log.info('*** Listening on %s... ' % \ green(', '.join(qnames))) wait_for_job = not burst try: result = Queue.dequeue_any(self.queues, wait_for_job, \ connection=self.connection) if result is None: break except UnpickleError as e: msg = '*** Ignoring unpickleable data on %s.' % \ green(e.queue.name) self.log.warning(msg) self.log.debug('Data follows:') self.log.debug(e.raw_data) self.log.debug('End of unreadable data.') self.failed_queue.push_job_id(e.job_id) continue job, queue = result self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) self.state = 'busy' self.fork_and_perform_job(job) did_perform_work = True finally: if not self.is_horse: self.register_death() return did_perform_work def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes within the given timeout bounds, or will end the work horse with SIGALRM. """ child_pid = os.fork() if child_pid == 0: self.main_work_horse(job) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) while True: try: os.waitpid(child_pid, 0) break except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during # os.waitpid()), we simply ignore it and enter the next # iteration of the loop, waiting for the child to end. In # any other case, this is some other unexpected OS error, # which we don't want to catch, so we re-raise those ones. if e.errno ! = errno.EINTR: raiseCopy the code
In Work, the work method is a while true infinite loop, dequeue a job from queues, fork a process used to process the job, and wait for its child process to finish or exit abnormally before processing the next job.
- Job module: Loads and dumps are implemented using cPickle to convert objects to strings.
class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ # Job construction @classmethod def create(cls, func, *args, **kwargs): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ connection = kwargs.pop('connection', None) job = Job(connection=connection) job._func_name = '%s.%s' % (func.__module__, func.__name__) job._args = args job._kwargs = kwargs job.description = job.get_call_string() return job def __init__(self, id=None, connection=None): if connection is None: connection = get_current_connection() self.connection = connection self._id = id self.created_at = times.now() self._func_name = None self._args = None self._kwargs = None self.description = None self.origin = None self.enqueued_at Self.ended_at = None self._result = None self.exc_info = None self.timeout = None self.ended_at = None self.ended_at = None self.exc_info = None self.timeout = NoneCopy the code
Contains the method name (module name + method name), parameter list (sequence parameter list + named parameter list)+ Description + Time (creation + end + queue joining time)+ execution result (_result) + timeout + exception information + queue name (Origin).
Redis def save(self): """Persists the current job instance to its corresponding Redis key.""" key = self.key obj = {} obj['created_at'] = times.format(self.created_at, 'UTC') if self.func_name is not None: obj['data'] = dumps(self.job_tuple) if self.origin is not None: obj['origin'] = self.origin if self.description is not None: obj['description'] = self.description if self.enqueued_at is not None: obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC') if self.ended_at is not None: obj['ended_at'] = times.format(self.ended_at, 'UTC') if self._result is not None: obj['result'] = self._result if self.exc_info is not None: obj['exc_info'] = self.exc_info if self.timeout is not None: Obj ['timeout'] = self.timeout self.connection.hmset(key, obj) # def refresh(self) # noqa """Overwrite the current instance's properties with the values in the corresponding Redis key. Will raise a NoSuchJobError if no corresponding Redis key exists. """ key = self.key properties = ['data', 'created_at', 'origin', 'description', 'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout'] data, created_at, origin, description, \ enqueued_at, ended_at, result, \ exc_info, timeout = self.connection.hmget(key, properties) if data is None: raise NoSuchJobError('No such job: %s' % (key,)) def to_date(date_str): if date_str is None: return None else: return times.to_universal(date_str) self._func_name, self._args, self._kwargs = unpickle(data) self.created_at = to_date(created_at) self.origin = origin self.description = description self.enqueued_at = to_date(enqueued_at) self.ended_at = to_date(ended_at) self._result = result self.exc_info = exc_info if timeout is None: self.timeout = None else: self.timeout = int(timeout)Copy the code
Redis related instructions
- Hash related: Structured storage of jobs, involving HMset and HMGET
- The list: Each queue is essentially a list of entries and exits using rpush, blPOP, and lop. BLOPO is the blocking version of LPOP. When there are no elements in a given list to eject, the connection is blocked by BLPOP until the queue times out or the eject element is found. BLPOP is to avoid polling.
- Set related: sadd and srem are used for workers to do register_birth and register_death, and workers are stored in one key.
queue1: job1 + job2 + job3 [list]
job1: id:xxxx , args:xxxx , origin:xxxxx , result:xxxx [hash]
workers: worker1 + worker2 [set]
worker1: birth:xxxx , queues:x1,x2 [hash]
Copy the code
Linux related Calls
- Process related
def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes within the given timeout bounds, or will end the work horse with SIGALRM. """ child_pid = os.fork() if child_pid == 0: self.main_work_horse(job) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) while True: try: os.waitpid(child_pid, 0) break except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during # os.waitpid()), we simply ignore it and enter the next # iteration of the loop, waiting for the child to end. In # any other case, this is some other unexpected OS error, # which we don't want to catch, so we re-raise those ones. if e.errno ! = errno.EINTR: raiseCopy the code
Fork a child process that executes a job. The parent process blocks until the child process completes or terminates because of a timeout. It is important to note that while the parent is waiting for the child, if it receives a SIGINT or SIGTERM signal, the parent will ignore it.
The signal processing
def _install_signal_handlers(self): """Installs signal handlers for handling SIGINT and SIGTERM gracefully. """ def request_force_stop(signum, frame): """Terminates the application (cold shutdown). """ self.log.warning('Cold shut down.') # Take down the horse with the worker if self.horse_pid: msg = 'Taking down horse %d with me.' % self.horse_pid self.log.debug(msg) try: os.kill(self.horse_pid, signal.SIGKILL) except OSError as e: # ESRCH ("No such process") is fine with us if e.errno ! = errno.ESRCH: self.log.debug('Horse already down.') raise raise SystemExit() def request_stop(signum, frame): """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). """ self.log.debug('Got %s signal.' % signal_name(signum)) signal.signal(signal.SIGINT, request_force_stop) signal.signal(signal.SIGTERM, request_force_stop) if self.is_horse: self.log.debug('Ignoring signal %s.' % signal_name(signum)) return msg = 'Warm shut down. Press Ctrl+C again for a cold shutdown.' self.log.warning(msg) self._stopped = True self.log.debug('Stopping after current horse is finished.') signal.signal(signal.SIGINT, request_stop) signal.signal(signal.SIGTERM, request_stop)Copy the code
After the parent process receives the SIGINT signal, it registers a new signal handler and sets the state again. Kill will be triggered only after the signal is sent again.
def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises a JobTimeoutException after the timeout amount (expressed in seconds). """ signal.signal(signal.SIGALRM, self.handle_death_penalty) signal.alarm(self._timeout)Copy the code
Set timing signal and processing function, alarm(0) on behalf of the clear timer
data
- Chinese document: codercharm. Making. IO/Python – rq – d…