0 x00 the
Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling. Celery is a call to its Worker component to complete specific task processing.
$ celery --app=proj worker -l INFO
$ celery -A proj worker -l INFO -Q hipri,lopri
$ celery -A proj worker --concurrency=4
$ celery -A proj worker --concurrency=1000 -P eventlet
$Celery worker - autoscale = 10, 0
Copy the code
Therefore, we will explain the startup process of worker in this article.
Structure of 0x01 Celery
Let us review the structure of Celery. The structure diagram of Celery is shown below:
+-----------+ +--------------+ | Producer | | Celery Beat | +-------+---+ +----+---------+ | | | | v v +-------------------------+ | Broker | +------------+------------+ | | | +-------------------------------+ | | | v v v +----+-----+ +----+------+ +-----+----+ | Exchange | | Exchange | | Exchange | +----+-----+ +----+------+ +----+-----+ | | | v v v +-----+ +-------+ +-------+ |queue| | queue | | queue | +--+--+ +---+---+ +---+---+ | | | | | | v v v +---------+ +--------+ +----------+ | worker | | Worker | | Worker | +-----+---+ +---+----+ +----+-----+ | | | | | | +-----------------------------+ | | v +---+-----+ | backend | +---------+Copy the code
0x02 Sample code
Actually, it is difficult to find ways to debug Celery worker online. We can go to its source and find the following:
# def test_worker_main(self):
# from celery.bin import worker as worker_bin
#
# class worker(worker_bin.worker):
#
# def execute_from_commandline(self, argv):
# return argv
#
# prev, worker_bin.worker = worker_bin.worker, worker
# try:
# ret = self.app.worker_main(argv=['--version'])
# assert ret == ['--version']
# finally:
# worker_bin.worker = prev
Copy the code
Therefore, we can imitate it by starting the worker as follows for debugging.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task()
def add(x, y) :
return x + y
if __name__ == '__main__':
app.worker_main(argv=['worker'])
Copy the code
0x03 Logic Overview
When a worker is started, the worker establishes a link (TCP long link) with the broker, and then, if there is a data transfer, the corresponding channel is created. This connection can have multiple channels. Then, the worker will fetch the corresponding task from Borker’s queue for consumption, which is also a typical consumer-producer mode.
This worker is mainly composed of four parts: task_pool, consumer, Scheduler and mediator. Task_pool is mainly used to store some workers. When a worker is started and concurrency parameters are provided, some workers will be placed in this pool.
Celery = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork = prefork The difference between this pool and a multi-process process pool is that the task_pool only houses running workers.
Consumer customers, mainly from the broker received some message, and then converts the message into celery. Worker. Request. An instance of the request.
Tasks are generated by the function with the app_include.task () when appropriate, so you can use this Request parameter in your custom Task function to get some key information.
Let’s see how to start Celery next.
0 x04 Celery applications
Let’s go first to Celery, that’s the application of Celery.
Class names, TLS, and the various signal after initialization.
The position is: celery/app/base.py which is defined as follows:
class Celery:
"""Celery application."""
amqp_cls = 'celery.app.amqp:AMQP'
backend_cls = None
events_cls = 'celery.app.events:Events'
loader_cls = None
log_cls = 'celery.app.log:Logging'
control_cls = 'celery.app.control:Control'
task_cls = 'celery.app.task:Task'
registry_cls = 'celery.app.registry:TaskRegistry'
#: Thread local storage.
_local = None
_fixups = None
_pool = None
_conf = None
_after_fork_registered = False
#: Signal sent when app is loading configuration.
on_configure = None
#: Signal sent after app has prepared the configuration.
on_after_configure = None
#: Signal sent after app has been finalized.
on_after_finalize = None
#: Signal sent by every new process after fork.
on_after_fork = None
Copy the code
For our sample code, the entry is:
def worker_main(self, argv=None) :
if argv is None:
argv = sys.argv
if 'worker' not in argv:
raise ValueError(
"The worker sub-command must be specified in argv.\n"
"Use app.start() to programmatically start other commands."
)
self.start(argv=argv)
Copy the code
4.1 Adding a Sub-Command
Celery /bin/celery. Py will add sub-command. These Commnd can be used directly on the command line as subcommands.
celery.add_command(purge)
celery.add_command(call)
celery.add_command(beat)
celery.add_command(list_)
celery.add_command(result)
celery.add_command(migrate)
celery.add_command(status)
celery.add_command(worker)
celery.add_command(events)
celery.add_command(inspect)
celery.add_command(control)
celery.add_command(graph)
celery.add_command(upgrade)
celery.add_command(logtool)
celery.add_command(amqp)
celery.add_command(shell)
celery.add_command(multi)
Copy the code
Each of these are commands. We take worker Command as an example, as follows:
worker = {CeleryDaemonCommand} <CeleryDaemonCommand worker>
add_help_option = {bool} True
allow_extra_args = {bool} False
allow_interspersed_args = {bool} True
context_settings = {dict: 1} {'allow_extra_args': True}
epilog = {NoneType} None
name = {str} 'worker'
options_metavar = {str} '[OPTIONS]'
params = {list: 32} [<CeleryOption hostname>, ...... , <CeleryOption executable>]
Copy the code
4.2 the entry point
The process will then introduce Celery and import Celery.
def start(self, argv=None) :
from celery.bin.celery import celery
celery.params[0].default = self
try:
celery.main(args=argv, standalone_mode=False)
except Exit as e:
return e.exit_code
finally:
celery.params[0].default = None
Copy the code
4.3 Cache Property cached_property
In Celery, a large number of member variables are modified by cached_property.
Functions decorated with cached_property become properties of an object. The first time the object refers to the property, the function is called, and the second time the object refers to the property, it is taken directly from the dictionary. Caches the return value of the get method on first call.
Many well-known Python projects have implemented cached_property themselves, such as Werkzeug and Django.
It was so useful that Python 3.8 added the cached_property class to the FuncTools module that it now has an official implementation.
The code for Celery is shown as follows:
@cached_property
def Worker(self) :
"""Worker application. """
return self.subclass_with_self('celery.apps.worker:Worker')
@cached_property
def Task(self) :
"""Base task class for this app."""
return self.create_task_cls()
@property
def pool(self) :
"""Broker connection pool: :class:`~@pool`. """
if self._pool is None:
self._ensure_after_fork()
limit = self.conf.broker_pool_limit
pools.set_limit(limit)
self._pool = pools.connections[self.connection_for_write()]
return self._pool
Copy the code
So, eventually, Celery should be like this:
app = {Celery} <Celery tasks at 0x7fb8e1538400>
AsyncResult = {type} <class 'celery.result.AsyncResult'>
Beat = {type} <class 'celery.apps.beat.Beat'>
GroupResult = {type} <class 'celery.result.GroupResult'>
Pickler = {type} <class 'celery.app.utils.AppPickler'>
ResultSet = {type} <class 'celery.result.ResultSet'>
Task = {type} <class 'celery.app.task.Task'>
WorkController = {type} <class 'celery.worker.worker.WorkController'>
Worker = {type} <class 'celery.apps.worker.Worker'>
amqp = {AMQP} <celery.app.amqp.AMQP object at 0x7fb8e2444860>
annotations = {tuple: 0} ()
autofinalize = {bool} True
backend = {DisabledBackend} <celery.backends.base.DisabledBackend object at 0x7fb8e25fd668>
builtin_fixups = {set: 1} {'celery.fixups.django:fixup'}
clock = {LamportClock} 1
conf = {Settings: 163} Settings({'broker_url': 'redis://localhost:6379'.'deprecated_settings': set(), 'cache_... configured = {bool} True control = {Control}
current_task = {NoneType} None current_worker_task = {NoneType} None events = {Events}
loader = {AppLoader}
main = {str} '
tasks' on_after_configure = {Signal} source'}> on_after_finalize = {Signal}
on_after_fork = {Signal}
on_configure = {Signal}
pool = {ConnectionPool}
producer_pool = {ProducerPool}
registry_cls = {type}
celery.app.registry.TaskRegistry'> set_as_current = {bool} True steps = {defaultdict: 2} defaultdict(set'>, {'worker': set(), 'consumer': set()}) tasks = {TaskRegistry: 10} {'celery.chain': <@task: celery.chain of tasks at 0x7fb8e1538400>, 'celery.starmap': <@task: celery.starmap of tasks at 0x7fb8e1538400>, 'celery.chord': <@task: celery.chord of tasks at 0x7fb8e1538400>, 'celery.backend_cleanup': <@task: celery.backend_clea user_options = {defaultdict: 0} defaultdict(set'>, {})
Copy the code
Examples of specific member variables are shown below:
+---------------------------------------+
| Celery |
| |
| Beat+-----------> celery.apps.beat.Beat
| |
| Task+-----------> celery.app.task.Task
| |
| WorkController+----------> celery.worker.worker.WorkController
| |
| Worker+-----------> celery.apps.worker.Worker
| |
| amqp +----------> celery.app.amqp.AMQP
| |
| control +----------> celery.app.control.Control
| |
| events +---------> celery.app.events.Events
| |
| loader +----------> celery.loaders.app.AppLoader
| |
| pool +----------> kombu.connection.ConnectionPool
| |
| producer_pool +----------> kombu.pools.ProducerPool
| |
| tasks +----------> TaskRegistry
| |
| |
+---------------------------------------+
Copy the code
0 x05 Celery command
Celery: Celery /bin/ Celery. Py
The code reduced version is as follows:
@click.pass_context
def celery(ctx, app, broker, result_backend, loader, config, workdir, no_color, quiet, version) :
"""Celery command entrypoint."""
if loader:
# Default app takes loader from this env (Issue #1066).
os.environ['CELERY_LOADER'] = loader
if broker:
os.environ['CELERY_BROKER_URL'] = broker
if result_backend:
os.environ['CELERY_RESULT_BACKEND'] = result_backend
if config:
os.environ['CELERY_CONFIG_MODULE'] = config
ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir,
quiet=quiet)
# User options
worker.params.extend(ctx.obj.app.user_options.get('worker', []))
beat.params.extend(ctx.obj.app.user_options.get('beat', []))
events.params.extend(ctx.obj.app.user_options.get('events'[])),for command in celery.commands.values():
command.params.extend(ctx.obj.app.user_options.get('preload'[])),Copy the code
In the method, you iterate over celery.commands and extend param as follows. These commands are the subcommands mentioned earlier:
celery.commands =
'report' = {CeleryCommand} <CeleryCommand report>
'purge' = {CeleryCommand} <CeleryCommand purge>
'call' = {CeleryCommand} <CeleryCommand call>
'beat' = {CeleryDaemonCommand} <CeleryDaemonCommand beat>
'list' = {Group} <Group list>
'result' = {CeleryCommand} <CeleryCommand result>
'migrate' = {CeleryCommand} <CeleryCommand migrate>
'status' = {CeleryCommand} <CeleryCommand status>
'worker' = {CeleryDaemonCommand} <CeleryDaemonCommand worker>
'events' = {CeleryDaemonCommand} <CeleryDaemonCommand events>
'inspect' = {CeleryCommand} <CeleryCommand inspect>
'control' = {CeleryCommand} <CeleryCommand control>
'graph' = {Group} <Group graph>
'upgrade' = {Group} <Group upgrade>
'logtool' = {Group} <Group logtool>
'amqp' = {Group} <Group amqp>
'shell' = {CeleryCommand} <CeleryCommand shell>
'multi' = {CeleryCommand} <CeleryCommand multi>
Copy the code
0x06 Worker subcommand
The Work subcommand is a member of the general Command, and is also called when we add the worker parameter directly on the Command line.
$ celery -A proj worker -l INFO -Q hipri,lopri
Copy the code
The worker subcommand inherits click.BaseCommand, for.
Defined in celery/bin/worker.py.
So the following code indirectly calls the worker command:
celery.main(args=argv, standalone_mode=False)
Copy the code
The definition is as follows:
def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
loglevel=None, logfile=None, pidfile=None, statedb=None,
**kwargs) :
"""Start worker instance. Examples -------- $ celery --app=proj worker -l INFO $ celery -A proj worker -l INFO -Q hipri,lopri $ celery -A proj worker --concurrency=4 $ celery -A proj worker --concurrency=1000 -P eventlet $ celery The worker - autoscale = 10, 0 "" "
app = ctx.obj.app
maybe_drop_privileges(uid=uid, gid=gid)
worker = app.Worker(
hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
logfile=logfile, # node format handled by celery.app.log.setup
pidfile=node_format(pidfile, hostname),
statedb=node_format(statedb, hostname),
no_color=ctx.obj.no_color,
**kwargs)
worker.start()
return worker.exitcode
Copy the code
Celery.apply.tasks (Celery) then enter the worker command:
+----------+ | User | +----+-----+ | | worker_main | v +---------+------------+ | Celery | | | | Celery application | | celery/app/base.py | | | +---------+------------+ | | celery.main | v +---------+------------+ | @click.pass_context | | celery | | | | | | CeleryCommand | | celery/bin/celery.py | | | +---------+------------+ | | | v +----------+------------+ | @click.pass_context | | worker | | | | | | WorkerCommand | | celery/bin/worker.py |
+-----------------------+
Copy the code
0x07 Worker application
At this point, the app Worker will be instantiated in this function, and Worker Application is the instance of Worker. In this case app is the instance of Celery defined earlier.
Defined in: celery/app/base.py.
@cached_property
def Worker(self) :
"""Worker application. See Also: :class:`~@Worker`. """
return self.subclass_with_self('celery.apps.worker:Worker')
Copy the code
At this point subclass_with_self uses Python’s Type to dynamically generate class instance properties.
def subclass_with_self(self, Class, name=None, attribute='app',
reverse=None, keep_reduce=False, **kw) :
"""Subclass an app-compatible class. """
Class = symbol_by_name(Class) # import class
reverse = reverse if reverse else Class.__name__ Check if a value is passed in, if not, use the class name
def __reduce__(self) : This method will be called during pickling
return _unpickle_appattr, (reverse, self.__reduce_args__())
attrs = dict(
{attribute: self}, # set app to self by default
__module__=Class.__module__,
__doc__=Class.__doc__,
**kw) # Fill properties
if not keep_reduce:
attrs['__reduce__'] = __reduce__ # If the default is generated the class sets the __reduce__ method
return type(bytes_if_py2(name or Class.__name__), (Class,), attrs) # use the type honesty class instance
Copy the code
At this time has got a celery from worker command. The apps. The worker: the worker instance, and then call the instance of start method, the first analyse the worker class instantiation process.
Let’s review:
Our execution goes from worker_main to the Celery application. Then enter Celery Command and then Worker sub-command as shown below.
+----------------------+
+----------+ | @cached_property |
| User | | Worker |
+----+-----+ +---> | |
| | | |
| worker_main | | Worker application |
| | | celery/app/base.py |
v | +----------------------+
+---------+------------+ |
| Celery | |
| | |
| Celery application | |
| celery/app/base.py | |
| | |
+---------+------------+ |
| |
| celery.main |
| |
v |
+---------+------------+ |
| @click.pass_context | |
| celery | |
| | |
| | |
| CeleryCommand | |
| celery/bin/celery.py | |
| | |
+---------+------------+ |
| |
| |
| |
v |
+----------+------------+ |
| @click.pass_context | |
| worker | |
| | |
| | |
| WorkerCommand | |
| celery/bin/worker.py | |
+-----------+-----------+ |
| |
+-----------------+
Copy the code
Worker, Celery = work as a program
We will continue to see the initiation process of follow-up Work as a program below.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
Celery source learning (ii) multi – process model
Study on celery principle
Celery source analysis – Wroker initialization analysis (1)
Celery source analysis -worker initialization
Celery worker initialization -DAG implementation
There are multiple queues and tasks with timed tasks
Celery Detail tutorial – Worker
The use of Celery
Celery source parsing a: Worker startup process overview
Celery: Worker’s executive engine
Celery = tasks with tasks
Celery: implementation of tasks
Celery: Remote control management
Celery source parsing six: implementation of Events
Celery source parsing seven: interactions between workers
Celery Celery: State and Result