0 x00 the
Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling. In this article we talk about the startup of the Celery Worker Consumer component.
We’ll start with an overview of the startup process, and then take a closer look at Consumer’s three characteristic sub-components (configuring the network, starting Tasks, and starting the consumption of Tasks). In this way, people will have a deeper understanding of the ins and outs of Consumer.
0 x01 review
Celery Worker is a consumer of tasks and will normally run multiple consumers on multiple servers to improve execution efficiency. So how to get messages from brokers in Woker. This requires a consumer.
Let’s start with a consumer legend to give you an idea.
As we already know, Kombu realizes the concepts of Producer and Consumer. Therefore it can be inferred that Producer and Consumer of Kombu must be used in the implementation of Celery.
1.1 kombu. Consumer
Let’s recall what kombu. consumer does:
The functions of KOMbu. Consumer and its related classes are as follows:
- Exchange: MQ route where message senders send messages to Exchange, which distributes messages to queues.
- Queue: The corresponding Queue abstraction stores messages that are about to be consumed by applications. Exchange distributes messages to queues from which consumers receive messages.
- Consumers are abstract classes that receive messages. Consumers need to declare a queue, bind it to a specified Exchange, and then receive messages from the queue. That is, from the user’s point of view, if you know an exchange, you can read messages from it, specifically from a queue.
In the concrete Consumer implementation, it associates a queue with a channel. A queue has a channel that accesses redis, and a queue has an Exchange that knows which key to access redis.
Consumer messages are consumed through a Queue, which is then passed on to a Channel.
In other words, if the exchange and queue are configured, the channel knows which redis key corresponds to which consumer. If there is a message in connection, the consumer callback is called.
1.2 Celery Consumer
Celery Consumer component not Kombu Consumer, uses Kombu Consumer to get messages from the broker.
The concept of celery Consumer component is much larger than Kombu Consumer, not only getting messages from the broker but also consuming messages, distribution, monitoring, heartbeat etc.
It can be said that except the message cycle engine is carried out by hub, multi-process is carried out by Pool, Autoscaler, timed task is carried out by timer, beat, other major functions are carried out by Consumer.
0x02 start in worker
As a reminder, when the Worker initialization is complete, the Worker continues to call start.
worker.start()
Copy the code
The code is as follows:
def start(self) :
try:
self.blueprint.start(self)
......
Copy the code
Hence the call to Blueprint.start (blueprint is responsible for determining the execution order of the individual submodules). Since Consumer is one of the worker’s components, the start of Consumer is called here.
The stack is as follows:
start, consumer.py:300
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
Copy the code
0x03 start in consumer
Now we come to the Consumer section.
Code is located in: celery/worker/consumer/consumer. Py
As we know, Consumer has its own Steps as follows:
class Consumer:
"""Consumer blueprint."""
Strategies = dict
#: Optional callback called the first time the worker
#: is ready to receive tasks.
init_callback = None
#: The current worker pool instance.
pool = None
#: A timer used for high-priority internal tasks, such
#: as sending heartbeats.
timer = None
class Blueprint(bootsteps.Blueprint) :
"""Consumer blueprint."""
name = 'Consumer'
default_steps = [
'celery.worker.consumer.connection:Connection'.'celery.worker.consumer.mingle:Mingle'.'celery.worker.consumer.events:Events'.'celery.worker.consumer.gossip:Gossip'.'celery.worker.consumer.heart:Heart'.'celery.worker.consumer.control:Control'.'celery.worker.consumer.tasks:Tasks'.'celery.worker.consumer.consumer:Evloop'.'celery.worker.consumer.agent:Agent',]Copy the code
Therefore, when Woker calls Consumer to start, it calls Consumer’s start.
Details are as follows:
def start(self) :
blueprint = self.blueprint
while blueprint.state not in STOP_CONDITIONS:
maybe_shutdown()
if self.restart_count:
try:
self._restart_state.step()
except RestartFreqExceeded as exc:
crit('Frequent restarts detected: %r', exc, exc_info=1)
sleep(1)
self.restart_count += 1
try:
blueprint.start(self) # Here is the key
except self.connection_errors as exc:
Copy the code
The following code is called to blueprint.start.
blueprint.start(self)
Copy the code
3.1 start consumer. The blueprint
Code is located in: celery – master/celery/bootsteps. Py
def start(self, parent) :
self.state = RUN
if self.on_start:
self.on_start()
for i, step in enumerate(s for s in parent.steps if s is not None):
self.started = i + 1
step.start(parent)
Copy the code
So go through every step, start.
So let’s talk a little bit about what these steps do.
- [1] Connection: Manage the Connection between broker and management
- [3] Events: send monitoring Events
- [2] Agent:
cell
actor - [2] Mingle: Mingle between different workers
- [1] Tasks: Start the message Consumer
- [3] Gossip: Consume events from other workers
- [1] Heart: send a heartbeat event (consumer’s heartbeat)
- [3] Control: Remote command management service
As mentioned in Reference article 1: Overview of Worker startup process:
I’ve labeled all the bootsteps here, and the size of the labels indicates that these services are important to our code reading, with 1 being the most important and 3 the least important. For consumers,
1 is the basic functionality that makes up a simple, non-robust message queue framework;
2 general important, can achieve a bit of advanced function;
3 is an added feature, but also a bit of a distributed feature.
Since the components corresponding to each step are quite complex, we will explain them in detail in future articles. This article has only outlined a few of the most important steps, which are basically message loop components. Such as the Connection component needed to read the broker and the Task component needed to process the message.
3.2 Connection Step child component
This sub-component deals primarily with network interactions.
Oddly enough, Connection has no logic of its own, leaving the Consumer class to do all the work.
The start argument c is consumer. So the start method calls the connect method of consumer, as well as the connection member variable of consumer.
So now the connection is made. Which will eventually create celery. App. It. The Connection as an example, here is actually USES kombu library Connection to connect to the queue. After the Connection is established, the Connection is registered in the kombu library’s Transport event loop.
The Consumer is thus associated with the broker.
class Connection(bootsteps.StartStopStep) :
"""Service managing the consumer broker connection."""
def start(self, c) :
c.connection = c.connect()
Copy the code
3.2.1 the connect in consumer
Code in: celery/worker/consumer/consumer. Py.
It can be seen that the following is done:
- Using the heartbeat as the parameter, create
celery.app.amqp.Connection
Example, that is, get kombu Connection, if no Connection, then establish a Connection. - Configure the resulting Connection as an asynchronous call.
- Return the resulting Connection.
The code is as follows:
def connect(self) :
"""Establish the broker connection used for consuming tasks. """
conn = self.connection_for_read(heartbeat=self.amqheartbeat) # a heartbeat
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub)Use asynchronous calls
return conn # returns conn
def connection_for_read(self, heartbeat=None) :
return self.ensure_connected(
self.app.connection_for_read(heartbeat=heartbeat))# Ensure connection
Copy the code
3.2.2 the connect in celery
The purpose of this section is to get a Connection.
So app is celery, so now we’re going to celery application.
Code in celery/app/base. Py
def connection_for_read(self, url=None, **kwargs) :
"""Establish connection used for consuming. """
return self._connection(url or self.conf.broker_read_url, **kwargs)
Copy the code
And then came to the
def _connection(self, url, userid=None, password=None,
virtual_host=None, port=None, ssl=None,
connect_timeout=None, transport=None,
transport_options=None, heartbeat=None,
login_method=None, failover_strategy=None, **kwargs) :
conf = self.conf
return self.amqp.Connection(
url,
userid or conf.broker_user,
password or conf.broker_password,
virtual_host or conf.broker_vhost,
port or conf.broker_port,
transport=transport or conf.broker_transport,
ssl=self.either('broker_use_ssl', ssl),
heartbeat=heartbeat,
login_method=login_method or conf.broker_login_method,
failover_strategy=(
failover_strategy or conf.broker_failover_strategy
),
transport_options=dict(
conf.broker_transport_options, **transport_options or {}
),
connect_timeout=self.either(
'broker_connection_timeout', connect_timeout
),
)
Copy the code
Can see that, in the end, regardless of the application of Celery Connection or Consumer see Connection, is it the Connection, the end is’ kombu. Connection. The Connection ‘.
Here’s the self.amqp variable as follows, and you can see that it’s all kombu related.
self.amqp = {AMQP} <celery.app.amqp.AMQP object at 0x7ffd556db7f0>
BrokerConnection = {type} <class 'kombu.connection.Connection'>
Connection = {type} <class 'kombu.connection.Connection'>
Consumer = {type} <class 'kombu.messaging.Consumer'>
Producer = {type} <class 'kombu.messaging.Producer'>
app = {Celery} <Celery tasks at 0x7ffd557f3da0>
argsrepr_maxsize = {int} 1024
autoexchange = {NoneType} None
default_exchange = {Exchange} Exchange celery(direct)
default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
kwargsrepr_maxsize = {int} 1024
producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7ffd56788748>
publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7ffd56788748>
queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
queues_cls = {type} <class 'celery.app.amqp.Queues'>
router = {Router} <celery.app.routes.Router object at 0x7ffd56799898>
routes = {tuple: 0} ()
task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x7ffd556db7f0> >,2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x7ffd556db7f0>>}
utc = {bool} True
Copy the code
We get a ‘kombu. Connection. The connection’
<Connection: redis://localhost:6379// at 0x7ffd567827b8>
And then it will connect.
def ensure_connected(self, conn) :
# Callback called for each retry while the connection
# can't be established.
def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP) :
if getattr(conn, 'alt'.None) and interval == 0:
next_step = CONNECTION_FAILOVER
next_step = next_step.format(
when=humanize_seconds(interval, 'in'.' '),
retries=int(interval / 2),
max_retries=self.app.conf.broker_connection_max_retries)
error(CONNECTION_ERROR, conn.as_uri(), exc, next_step)
# remember that the connection is lazy, it won't establish
# until needed.
if not self.app.conf.broker_connection_retry:
# retry disabled, just call connect directly.
conn.connect()
return conn
conn = conn.ensure_connection(
_error_handler, self.app.conf.broker_connection_max_retries,
callback=maybe_shutdown,
)
return conn
Copy the code
The stack is as follows:
ensure_connected, consumer.py:414
connection_for_read, consumer.py:405
connect, consumer.py:398
start, connection.py:21
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
<module>, myTest.py:26
Copy the code
You end up with a valid connection.
For Read
+------------------------------------------+ +------------------------------------------------------------------------+
+--------+ | [Consumer] Connection+-------> |Connection: redis://localhost:6379// class 'kombu.connection.Connection'|
| Gossip +<-----+ | +------------------------------------------------------------------------+
+--------+ | | +----------+
| +--------> | | ^
| | | Events | |
+-------+ | | | | |
| Tasks | <-----+ Timer Strategies Blueprint | +----------+ |
+-------+ | | |
| | |
+-------+ | pool hub loop app | +-------+ |
| Heart | <-----+ + + + + +--------> | Agent | |
+-------+ | | | | | | +-------+ |
| | | | | | +---------+ |
+------------------------------------------+--------> | Mingle | |
| | | | +---------+ |
| | | | |
v v v v |
|
+-----------------+ +-----+ +----------------+ +---------------+ +--------------+ |
| prefork.TaskPool| | Hub | | loops.asynloop | | Celery | | AMQP | |
+-----------------+ +-----+ +----------------+ | | | | |
| amqp +-----------> | Connection+------+
+---------------+ +--------------+
Copy the code
The mobile phone is as follows:
3.2.3 Using asynchronous Invocation
The following code uses the KOMbu library Connection to connect to the queue. After the Connection is established, the Connection is registered in the kombu library’s Transport event loop.
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub) Use asynchronous calls
Copy the code
So the end result is as follows:
For Read
+------------------------------------------+ +------------------------------------------------------------------------+
+--------+ | [Consumer] Connection+-------> |Connection: redis://localhost:6379// class 'kombu.connection.Connection'|| Gossip +<-----+ | +------------------------------------------------------------------------+ +--------+ | | +----------+ | +--------> | | ^ ^ | | | Events | | | +-------+ | | | | | | | Tasks | <-----+ Timer Strategies Blueprint | +----------+ | | +-------+ | | | | | | | | +-------+ | pool hub loop app | +-------+ | | | Heart | <-----+ + + + + +--------> | Agent | | | +-------+ | | | | | | +-------+ | | | | | | | | +---------+ | | +------------------------------------------+--------> | Mingle | | | | | | | +---------+ | | | | | | | | v v v v | | | | +-----------------+ +-------+ +----------------+ +---------------+ +--------------+ | | | prefork.TaskPool| | | | loops.asynloop | | Celery | | AMQP | | | +-----------------+ | Hub | +----------------+ | | | | | | | | | amqp +-----------> | Connection+------+ | | | +---------------+ +--------------+ | +---+---+ | | | +---------------------------------------------------------------------------------------->+Copy the code
The mobile phone is as follows:
3.3 Tasks Step sub-component
Since the network connection is already configured, various tasks are introduced in this section. Let’s first analyze the opening of loop.
C = <Consumer: celery (running)>
Let’s first introduce tasks like celery.
When Celery starts, it looks up which classes or functions in the code use the @task annotation and registers those classes or functions into the global callback collection. There is a global set: _on_app_finalizers. This set is used to collect all tasks in the class.
So far Celery knows what tasks there are and has collected them in on_app_finalizers, but not yet. Alternatively it can be thought that Celery only knows which classes there are, but there are no instances of those classes and associations also need to be established.
So, Celery runs the callbacks in the global _on_app_finalizers collection to get instances of tasks and then adds them to tasks in the tasks list.
This task is used for subsequent consumption of messages. Get the specific task instance based on the task name provided by the client and process it.
self._tasks = {TaskRegistry: 10}
NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>'celery.chunks'= {chunks} < @task: celery.chunks of myTest at 0x7fb652da5fd0>
'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fb652da5fd0>
'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fb652da5fd0>
'celery.group' = {group} <@task: celery.group of myTest at 0x7fb652da5fd0>
'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fb652da5fd0>
'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fb652da5fd0>
'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fb652da5fd0>
'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fb652da5fd0>
'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fb652da5fd0>
'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fb652da5fd0>
__len__ = {int} 10
Copy the code
So let’s next look at how to start the Task component.
3.3.1 start start
Task starts as follows:
- Update known tasks;
- Access to the
kombu . consumer
, it isc . task_consumer
; - Start spending;
Details are as follows:
class Tasks(bootsteps.StartStopStep) :
"""Bootstep starting the task message consumer."""
requires = (Mingle,)
def __init__(self, c, **kwargs) :
c.task_consumer = c.qos = None
super().__init__(c, **kwargs)
def start(self, c) :
"""Start task consumer."""
c.update_strategies() Update known tasks
# - RabbitMQ 3.3 completely redefines how basic_qos works..
# This will detect if the new qos smenatics is in effect,
# and if so make sure the 'apply_global' flag is set on qos updates.
qos_global = not c.connection.qos_semantics_matches_spec
# set initial prefetch count
c.connection.default_channel.basic_qos(
0, c.initial_prefetch_count, qos_global,
) # set count
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
) # Start spending
def set_prefetch_count(prefetch_count) :
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count) # set count
Copy the code
3.3.2 rainfall distribution on 10-12 strategy
There is a strategy for running tasks, which can also be considered load balancing. Its strategy is as follows:
SCHED_STRATEGY_FCFS = 1
SCHED_STRATEGY_FAIR = 4
SCHED_STRATEGIES = {
None: SCHED_STRATEGY_FAIR,
'default': SCHED_STRATEGY_FAIR,
'fast': SCHED_STRATEGY_FCFS,
'fcfs': SCHED_STRATEGY_FCFS,
'fair': SCHED_STRATEGY_FAIR,
}
Copy the code
Celery will configure the callback strategy and callback method for each task, e.g. : ‘celery. Chunks’ = {function}
. Task_message_handler at 0x7fc5a47d5a60>
3.3.3 Basic Default Policy
Let’s take the basic default policy as an example to see what it does. In it, a Request is built from the Task instance to link broker messages, consumers, and multiple processes. Execute_using_pool will be associated with multi-process processing, such as comsumer’s pool.
The code is:
def default(task, app, consumer,
info=logger.info, error=logger.error, task_reserved=task_reserved,
to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t,
proto1_to_proto2=proto1_to_proto2) :
"""Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override. """
hostname = consumer.hostname # Set up relevant consumer information
connection_errors = consumer.connection_errors # set error value
_does_info = logger.isEnabledFor(logging.INFO)
# task event related
# (optimized to avoid calling request.send_event)
eventer = consumer.event_dispatcher
events = eventer and eventer.enabled
send_event = eventer.send
task_sends_events = events and task.send_events
call_at = consumer.timer.call_at
apply_eta_task = consumer.apply_eta_task
rate_limits_enabled = notconsumer.disable_rate_limits get_bucket = consumer.task_buckets.__getitem__ handle = consumer.on_task_request limit_task = consumer._limit_task body_can_be_buffer = consumer.pool.body_can_be_buffer Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)Return the request class
revoked_tasks = consumer.controller.state.revoked
def task_message_handler(message, body, ack, reject, callbacks, to_timestamp=to_timestamp) :
if body is None:
body, headers, decoded, utc = (
message.body, message.headers, False.True.)if not body_can_be_buffer:
body = bytes(body) if isinstance(body, buffer_t) else body
else:
body, headers, decoded, utc = proto1_to_proto2(message, body) Parse the received data
req = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
) Instantiate the request
if (req.expires or req.id in revoked_tasks) and req.revoked():
return
if task_sends_events:
send_event(
'task-received',
uuid=req.id, name=req.name,
args=req.argsrepr, kwargs=req.kwargsrepr,
root_id=req.root_id, parent_id=req.parent_id,
retries=req.request_dict.get('retries'.0),
eta=req.eta and req.eta.isoformat(),
expires=req.expires and req.expires.isoformat(),
) Send accept request if need to send
if req.eta: # Time dependent processing
try:
if req.utc:
eta = to_timestamp(to_system_tz(req.eta))
else:
eta = to_timestamp(req.eta, timezone.local)
except (OverflowError, ValueError) as exc:
req.reject(requeue=False)
else:
consumer.qos.increment_eventually()
call_at(eta, apply_eta_task, (req,), priority=6)
else:
if rate_limits_enabled: # rate limit
bucket = get_bucket(task.name)
if bucket:
return limit_task(req, bucket, 1)
task_reserved(req)
if callbacks:
[callback(req) for callback in callbacks]
handle(req) # process accepted requests
return task_message_handler
Copy the code
The handler being processed is the w.Patro_task passed in when the consumer is initialized,
def _process_task(self, req) :
"""Process task by sending it to the pool of workers."""
try:
req.execute_using_pool(self.pool)
except TaskRevokedError:
try:
self._quick_release() # Issue 877
except AttributeError:
pass
Copy the code
Request.execute_using_pool will be associated with multi-process processing, such as comsumer’s pool.
3.3.4 Updating a Known Task Policy
At startup, update_Strategies are called to update known task policies.
class Tasks(bootsteps.StartStopStep) :
"""Bootstep starting the task message consumer."""
def start(self, c) :
"""Start task consumer."""
c.update_strategies()
Copy the code
The code is as follows:
def update_strategies(self) :
loader = self.app.loader
for name, task in self.app.tasks.items():
self.strategies[name] = task.start_strategy(self.app, self)
task.__trace__ = build_tracer(name, task, loader, self.hostname,
app=self.app)
Copy the code
Self.app. tasks Indicates the tasks collected when the application is started. At this point, you need to look again to see if you need to update the policy.
The variables are as follows:
self.app.tasks = {TaskRegistry: 10}
NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>'celery.chunks'= {chunks} < @task: celery.chunks of myTest at 0x7ffe3ff08198>
'myTest.add' = {add} <@task: myTest.add of myTest at 0x7ffe3ff08198>
'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7ffe3ff08198>
'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7ffe3ff08198>
'celery.group' = {group} <@task: celery.group of myTest at 0x7ffe3ff08198>
'celery.map' = {xmap} <@task: celery.map of myTest at 0x7ffe3ff08198>
'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7ffe3ff08198>
'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7ffe3ff08198>
'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7ffe3ff08198>
'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7ffe3ff08198>
__len__ = {int} 10
self = {Consumer} <Consumer: celery (running)>
Copy the code
At this point we continue to look at the task.start_strategy function,
def start_strategy(self, app, consumer, **kwargs) :
return instantiate(self.Strategy, self, app, consumer, **kwargs) Create task instance
Copy the code
After the operation, the following strategies are obtained, which contain the callback methods for each task, currently task_message_handler. This is where a Request is constructed based on the task instance and the broker message, Consumer, multiple processes are linked together.
strategies = {dict: 10}
'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878400>
'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878598>
'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878840>
'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878ae8>
'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878d90>
'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b0d0>
'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b378>
'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b620>
'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b8c8>
__len__ = {int} 10
Copy the code
The logic is (because there are too many variables in the Consumer member, some variables are omitted for clarity) :
+-----------------------+ +---------------------------+
| Celery | | Consumer |
| | | |
| consumer +---------------------> | | +---------------+
| | | task_consumer +---------------> | amqp.Consumer |
| _tasks | | | +---------------+
| + | | |
| | | | strategies +----------------+
+-----------------------+ | | |
| | | |
| +---------------------------+ |
| v
v
+------------------------------------------------------+-------------------------------------+ +-----------------------------------------------------------------------------+
| | | strategies = {dict: 10} |
| TaskRegistry | | 'celery.chunks' = function default.<locals>.task_message_handler |
| | | 'celery.backend_cleanup' = function default.<locals>.task_message_handler |
| NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | |'celery.chord_unlock' = function default.^locals>.task_message_handler |
| 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | 'celery.group' = function default.<localsv.task_message_handler |
| 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | 'celery.map' = function default.<locals>.task_message_handler |
| 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | 'celery.chain' = function default.<locals>.task_message_handler |
| 'celery.group' = {group} <@task: celery.group of myTest> | | 'celery.starmap' = function default.<locals>.task_message_handler |
| 'celery.map' = {xmap} <@task: celery.map of myTest> | | 'celery.chord' = function default.<locals>.task_message_handler |
| 'celery.chain' = {chain} <@task: celery.chain of myTest> | | 'myTest.add' = function default.<locals^.task_message_handler |
| 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.accumulate' = function default.vlocals>.task_message_handler |
| 'celery.chord' = {chord} <@task: celery.chord of myTest> | | |
| 'myTest.add' = {add} <@task: myTest.add of myTest> | +-----------------------------------------------------------------------------+
| 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> |
| |
+--------------------------------------------------------------------------------------------+
Copy the code
As shown in the picture on the mobile phone, tasks and their corresponding strategies are mainly as follows:
3.3.5 consumption
The following code associates the task with the corresponding KOMbu consumer.
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
)
Copy the code
C. app.amqp.TaskConsumer returns a Consumer. Kombu.consumer is also configured with c.onnection.
- <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>
- <Connection: redis://localhost:6379>
Thus, the Consumer component of Celery is associated with the Consumer of Kombu. Task_consumer = task_consumer = task_consumer = task_consumer = task_consumer = task_consumer = task_consumer = task_consumer = task_consumer = task_consumer It will call back to kombu.consumer. We’ll see how to use it below.
The code is as follows:
from kombu import Connection, Consumer, Exchange, Producer, Queue, pools
class AMQP:
"""App AMQP API: app.amqp."""
Connection = Connection
Consumer = Consumer
Producer = Producer
def TaskConsumer(self, channel, queues=None, accept=None, **kw) :
if accept is None:
accept = self.app.conf.accept_content
return self.Consumer(
channel, accept=accept,
queues=queues or list(self.queues.consume_from.values()),
**kw
)
Copy the code
The logic is as follows (some variables are omitted for clarity because there are too many Consumer members) :
+--------------------------------+
| [Consumer] | For Read
| | +----------------------------------------+
| Connection+-------> | <Connection: redis://localhost:6379//> || | +----------------------------------------+ | | | | +--------+ | +-----> | Tasks | | | +--------+ | | | app task_consumer+--------------------------------------------------------------------------------------+ | + | | | | | | | | | | +--------------------------------+ | | +-------------------------------------+ | | | celery.app.amqp.AMQP | | | | | | +-----+----------------------+ | | | | Celery | | BrokerConnection +-------> kombu.connection.Connection | | | | | | | amqp+------------>+ Connection +-------> kombu.connection.Connection | | | | | | | | | Consumer +-------> kombu.messaging.Consumer <----+ +----------------------------+ | | | Producer +-------> kombu.messaging.Producer | | | producer_pool +-------> kombu.pools.ProducerPool | | | queues +-------> celery.app.amqp.Queues | | | router +-------> celery.app.routes.Router +-------------------------------------+Copy the code
Mobile phone as shown in figure
3.4 Event Loop subcomponent
The child component enables the consumption of tasks configured in the previous child component. In code position: celery/worker/consumer/consumer. Py.
Corresponding to the ‘celery. Worker. Consumer. Consumer: Evloop’ this step.
[<step: Connection>, <step: Events>, <step: Mingle>, <step: Gossip>, <step: Tasks>, <step: Control>, <step: Heart>, <step: event loop>]
class Evloop(bootsteps.StartStopStep) :
"""Event loop service. Note: This is always started last. """
label = 'event loop'
last = True
def start(self, c) :
self.patch_all(c)
c.loop(*c.loop_args())
def patch_all(self, c) :
c.qos._mutex = DummyLock()
Copy the code
In fact, the loop function in consumer is called.
3.4.1 track loop in consumer
Celery /worker/loops.
The loop function in consumer is the asynloop function in celery/worker/loops.
Init has the following code to configure loop function:
if not hasattr(self, 'loop'):
self.loop = loops.asynloop if hub else loops.synloop
Copy the code
The details are as follows (there are too many variables in the Consumer member, so some variables are omitted for clarity) :
+--------------------------------+ +--------+ | [Consumer] +-----> | Evloop | | | +--------+ | | +--------------------------+ | | | on_tick +--------> Transport.register_with_event_loop | +-------> | Hub | | | | poller +---------> kombu.utils.eventio._poll | | | | | | | readers | | | | | | create_task_handler loop +-----------------------> create_loop+-------> loops.asynloop | | | | | | +--------------------------+ | app | | + | | | task_consumer +-------------------------------------------------------------------------->+ | | | | +--------------------------------+ | | | | | | | +----+---------+ | | Celery | +-------------------------------------+ | | | | celery.app.amqp.AMQP | | | | | | | | | | | | | | | BrokerConnection +-------> kombu.connection.Connection | | | | | | | amqp+-------->+ Connection +-------> kombu.connection.Connection | | | | | | +--------------+ | Consumer +-------> kombu.messaging.Consumer <----------+ | | | Producer +-------> kombu.messaging.Producer | | | producer_pool +-------> kombu.pools.ProducerPool | | | queues +-------> celery.app.amqp.Queues | | | router +-------> celery.app.routes.Router +-------------------------------------+Copy the code
Mobile phone is:
3.4.2 configuration kombu. Consumer
There is the following code:
c.loop(*c.loop_args())
Copy the code
Note that self.task_consumer, kombu. Consumer, is configured with c.connection.
def loop_args(self) :
return (self, self.connection, self.task_consumer,
self.blueprint, self.hub, self.qos, self.amqheartbeat,
self.app.clock, self.amqheartbeat_rate)
Copy the code
The logic is as follows (some variables are omitted for clarity because there are too many Consumer variables) :
+--------------------------------+ +--------+
| [Consumer] +--------> | Evloop |
| | +--------+
| | +--------------------------+
| | | on_tick +-----+--> Transport.register_with_event_loop
| +-------> | Hub | |
| | | | +--> AsynPool._create_write_handlers.<locals>.on_poll_start | | | | + | | | | | | | | | v | create_task_handler | | | iterate_file_descriptors_safely | | | poller +---------> kombu.utils.eventio._poll ^ | | | | + | | app loop +-----------------------> create_loop+-------> loops.asynloop | | | + | | | | +-----+ | | | task_consumer | +--------------------------+ +----------> | fd | +--------+ | | + | +-----+ | | | | | | | | +--------------------------------------+ | | | Connection +----------> | <Connection: redis://localhost:6379> | | | | | +--------------------------------------+ +--------------------------------+ | | ^ | | | | v | +----+----+ +----+-------------------------+ | | Celery | | kombu . Consumer | | | | | | | | | | channel+--------------+ +---------+ | | +------------------------------+Copy the code
Here’s what’s on the phone:
)
3.4.3 Start consumption
In asynloop, will:
-
Set up the message processing (parsing the message and executing it) function, that is, the actual message processing (parsing the message and executing it) logic is this create_task_handler;
-
Set the message callback function for kombu. consumer, that is, on_task_received is the function that accepts the message last.
-
Call hub.create_loop() to get the execution engine;
-
Call the next(loop) execution engine;
def asynloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0) :
"""Non-blocking event loop."""
RUN = bootsteps.RUN
update_qos = qos.update
errors = connection.connection_errors
on_task_received = obj.create_task_handler() Set up the message handling (parsing the message and executing it) function
_enable_amqheartbeats(hub.timer, connection, rate=hbrate)
consumer.on_message = on_task_received
obj.controller.register_with_event_loop(hub)
obj.register_with_event_loop(hub)
consumer.consume()
obj.on_ready()
loop = hub.create_loop()
try:
while blueprint.state == RUN and obj.connection:
state.maybe_shutdown()
# We only update QoS when there's no more messages to read.
# This groups together qos calls, and makes sure that remote
# control commands will be prioritized over task messages.
ifqos.prev ! = qos.value: update_qos()try:
next(loop)
except StopIteration:
loop = hub.create_loop()
finally:
try:
hub.reset()
except Exception as exc: # pylint: disable=broad-except
logger.exception(
'Error cleaning up after event loop: %r', exc)
Copy the code
At this point, the asynchronous Loop starts and the server side event waiting begins.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
Celery source parsing a: Worker startup process overview
Celery: Worker’s executive engine
Celery = tasks with tasks
Celery: implementation of tasks
Celery: Remote control management
Celery source parsing six: implementation of Events
Celery source parsing seven: interactions between workers
Celery Celery: State and Result