In-depth practice

Practice is the necessary step to learn something, this time I tried to implement a simple version of the task queue system. The three elements of the task system are job, queue and worker. A job is generated using queue and persisted in Redis. The worker reads redis to get the job and then executes the job.

The code structure

-- job.py
-- queue.py
-- worker.py
-- dummmy.py 
-- exeception.py
Copy the code

The source code

  • job.py
import time import json import uuid import importlib import signal from .exception import * class JobStatus: pending = "pending" running = "running" failed = "failed" finished = "finished" timeout = "timeout" class Job: redis_job_prefix = "myrq:job:" def __init__(self,connection) -> None: self.connection = connection self._func_name = None self._args = None self._kwargs = None self._status = None self._timeout = 60 self.id = None @classmethod def create(cls,func,*args,**kwargs): connection = kwargs.pop('connection', None) timeout = kwargs.pop('timeout',None) print(timeout) job = Job(connection=connection) job._func_name = '%s.%s' % (func.__module__, func.__name__) job._args = args job._kwargs = kwargs job._status = JobStatus.pending job.id = uuid.uuid4() if timeout: job._timeout = timeout return job @classmethod def fetch(cls,connection,id): job_info = connection.hgetall(f'{cls.redis_job_prefix}{id}') if not job_info: return None job = Job(connection) job.id = id job.loads(job_info['func']) job._timeout = int(job_info['timeout']) return  job @property def status(self): return self._status @status.setter def status(self,status): self._status = status self.connection.hset(f'{self.redis_job_prefix}{self.id}',"status",status) @property def func(self): func_name = self._func_name if func_name is None: return None module_name, func_name = func_name.rsplit('.', 1) module = importlib.import_module(module_name) return getattr(module, func_name) def perform_job(self): self.register_signal_handlers() self.status = JobStatus.running result = None try: result = self.func(*self._args,**self._kwargs) except TimeoutException: self.status = JobStatus.timeout except Exception as e: self.status = JobStatus.failed else: self.status = JobStatus.finished return result def dumps(self): obj = {} obj['func_name'] = self._func_name obj['args'] = self._args obj['kwargs'] = self._kwargs return json.dumps(obj)  def loads(self,func_json): obj = json.loads(func_json) self._func_name = obj.pop("func_name",None) self._args = obj.pop("args",None) self._kwargs =  obj.pop("kwargs",None) def save(self): obj = {} obj['created_at'] = int(time.time()) obj['func'] = self.dumps() obj['status'] = self._status obj['timeout'] = self._timeout pipe = self.connection.pipeline(transaction=False) pipe.hmset(f'{self.redis_job_prefix}{self.id}',obj) pipe.expire(f'{self.redis_job_prefix}{self.id}',3600) pipe.execute() def register_signal_handlers(self): print(self._timeout) def handle_alarm_signal(): raise TimeoutException() signal.signal(signal.SIGALRM,handle_alarm_signal) signal.alarm(self._timeout)Copy the code
  • queue.py
class Queue:
    
    redis_queue_prefix= "myrq:queue:"
    
    def __init__(self,name="default",connection=None):
        if connection is None:
            connection = redis.Redis(host="127.0.0.1",port=6379,db=0)
        self.name = name
        self.connection = connection
    
    def enqueue_job(self,func,*args,**kwargs):
        kwargs["connection"] = self.connection
        job = Job.create(func,*args,**kwargs)
        job.save()
        self.connection.rpush(f"{self.redis_queue_prefix}{self.name}",f"{job.id}")
    
    def dequeue_job(self):
        queue_name,job_id = self.connection.blpop(f"{self.redis_queue_prefix}{self.name}")
        return Job.fetch(connection=self.connection,id=job_id)
Copy the code
  • worker.py
from redis import connection
from .job import *
from .queue import *

class Worker:
    
    def __init__(self,connection,queue_name="default"): 
        self.queue_name = queue_name
        self.connection = connection
        self.queue:Queue = Queue(name=queue_name,connection=connection)
    
    
    def run_forever(self):
        while True:
            job = self.queue.dequeue_job()
            if job is None:
                continue
            result = job.perform_job()
Copy the code

Has a novelty technology

  • Property can be used to encapsulate attributes. To encapsulate all attributes, use the __setitem__ method.
  • Use Raise to raise exceptions when necessary.
  • Load the module dynamically with importlib.import_module, using the _module_ property of the method to get the module name.