Advanced use of Python
Python is a high-level language with a lot of syntax, and many are unique to the Python language. RQ, a popular distributed task framework, also uses many of Python’s syntactic sugar. Read the source code to understand how to better use these syntactic sugar and magic methods.
propery
- Description: the @property syntax provides a much cleaner and more intuitive way to write property() functions. The method decorated with @property is the method that gets the value of the property, and the name of the decorated method is used as the property name. Methods decorated with @ property name.setter are methods that set property values. Methods decorated with @ property name.deleter are methods that delete property values.
## Job
@property
def func_name(self):
return self._func_name
@property
def func(self):
func_name = self.func_name
if func_name is None:
return None
module_name, func_name = func_name.rsplit('.', 1)
module = importlib.import_module(module_name)
return getattr(module, func_name)
@property
def args(self):
return self._args
@property
def kwargs(self):
return self._kwargs
Copy the code
- Summary: @Property encapsulates complex details, customizing the behavior of getting properties, setting properties, and deleting properties.
A decorator
Decorators can be used for classes as well as methods, to decorate methods and classes.
@total_ordering
class Queue(object):
redis_queue_namespace_prefix = 'rq:queue:'
def total_ordering(cls):
"""Class decorator that fills in missing ordering methods"""
# Find user-defined comparisons (not those inherited from object).
roots = {op for op in _convert if getattr(cls, op, None) is not getattr(object, op, None)}
if not roots:
raise ValueError('must define at least one ordering operation: < > <= >=')
root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__
for opname, opfunc in _convert[root]:
if opname not in roots:
opfunc.__name__ = opname
setattr(cls, opname, opfunc)
return cls
Copy the code
Context manager
The context manager is python-specific and is designed to simplify code operations, make code simpler and maintainable, reduce redundancy, and make it more reusable.
Try: VAR = XXXXX except XXX as e: PASS finally: DO SOMETHING with EXPR as VAR: BLOCKCopy the code
Here is a standard context manager usage logic, with a little explanation of the runtime logic:
(1) Execute EXPR statement, obtain Context Manager
(2) Call the __enter__ method in the context manager, which does some preprocessing.
(3) As VAR can be omitted; if not, the return value of the __enter__ method is assigned to VAR.
(4) Execute the code BLOCK, where VAR can be used as a normal variable.
(5) Finally call the __exit__ method in the context manager.
The exit__ method takes three parameters: exc_type, exc_val, exc_tb. If a code BLOCK fails and exits, the type, value, and traceback of the exception correspond. Otherwise, all three parameters are None. If the __exit() method returns false, the exception is rethrown; If the return value is true, the exception is considered handled and the program continues.
class death_pentalty_after(object):
def __init__(self, timeout):
self._timeout = timeout
def __enter__(self):
self.setup_death_penalty()
def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
pass
return False
def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout)
def setup_death_penalty(self):
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)
def cancel_death_penalty(self):
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
Copy the code
RQ uses the context manager to manage the timeout of the job. It works with the signal module to complete the timeout control of the job. It registers the signal handler at the beginning, sets the timing signal, and cancells the timing signal and ignores the signal at the end.
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
try:
with death_pentalty_after(job.timeout or 180):
rv = job.perform()
except Exception as e:
fq = self.failed_queue
self.log.exception(red(str(e)))
self.log.warning('Moving job to %s queue.' % fq.name)
fq.quarantine(job, exc_info=traceback.format_exc())
return False
if rv is None:
self.log.info('Job OK')
else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
if rv is not None:
p = self.connection.pipeline()
p.hset(job.key, 'result', dumps(rv))
p.expire(job.key, self.rv_ttl)
p.execute()
else:
# Cleanup immediately
job.delete()
Copy the code