0 x00 the
Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling.
In the previous article we looked at the analysis of tasks, in this article we focused on how tasks are sent on the client side and how the AMQP object of Celery is used.
Before we start reading, we still need to ask a few questions to guide us as we read:
- How are Celery applications and user defined tasks generated when the client starts?
- What does the Task decorator do?
- How is the message assembled when sending a Task?
- What medium (module) is used to send tasks? Closer?
- How are tasks stored in Redis after they are sent?
Note: When I was sorting out the article, I found that I missed one article, which will affect your thinking of reading, so I hereby make up for it, please understand.
The basic architecture of message queue Kombu
Start process of message queue Kombu
Message queue Consumer of Kombu
Producer of message queue Kombu
Message queue Hub of Kombu
Message queue Kombu mailbox
Build-ins with build-ins (1)
Build-ins with build-ins (2)
[正 文] Celery worker start with parallel tasks
[正 文] Celery worker start with parallel tasks (2)
[source code] start Consumer with tasks
[正 文] What is the Task of Celery with parallel distributed tasks
Celerate tasks & tasks from the perspective of the client
Tasks are not consumed by tasks. Tasks are not consumed by tasks
Multi process architecture and model of parallel distributed tasks
0x01 Sample code
Let’s start with sample code.
1.1 the service side
The sample code server is shown below, using a decorator to wrap the task to be executed.
from celery import Celery
app = Celery('myTest', broker='redis://localhost:6379')
@app.task
def add(x,y) :
return x+y
if __name__ == '__main__':
app.worker_main(argv=['worker'])
Copy the code
1.2 the client
The client sends the following code, which calls Add Task to do the addition calculation:
from myTest import add
re = add.apply_async((2.17))
Copy the code
Let’s start with the details. The following are the client execution sequences.
0x02 The system starts
We will first introduce how the Celery system and task (instances) are started on the client side.
2.1 produce Celery
The following code will first execute myTest Celery.
app = Celery('myTest', broker='redis://localhost:6379')
Copy the code
2.2 Task Decorator
Celery uses decorators to wrap tasks to be performed (because of similar concepts in various languages the terms decorators or annotations may be used intermixed in this article)
@app.task
def add(x,y) :
return x+y
Copy the code
The task decorator execution returns the result of the _create_task_cls internal function execution.
This function returns a Proxy that, when actually executed, executes _task_from_fun.
_task_from_fun adds the task to a global variable. When _task_from_fun is called, the task will be added to the app task list so that all tasks can be shared. So that the client can know about the task.
def task(self, *args, **opts) :
"""Decorator to create a task class out of any callable. """
if USING_EXECV and opts.get('lazy'.True) :from . import shared_task
return shared_task(*args, lazy=False, **opts)
def inner_create_task_cls(shared=True.filter=None, lazy=True, **opts) :
_filt = filter
def _create_task_cls(fun) :
if shared:
def cons(app) :
return app._task_from_fun(fun, **opts) When _task_from_fun is called, the task will be added to the app task list so that all tasks can be shared
cons.__name__ = fun.__name__
connect_on_app_finalize(cons)
if not lazy or self.finalized:
ret = self._task_from_fun(fun, **opts)
else:
# return a proxy object that evaluates on first use
ret = PromiseProxy(self._task_from_fun, (fun,), opts,
__doc__=fun.__doc__)
self._pending.append(ret)
if _filt:
return _filt(ret)
return ret
return _create_task_cls
if len(args) == 1:
if callable(args[0) :return inner_create_task_cls(**opts)(*args) # Execution is here
return inner_create_task_cls(**opts)
Copy the code
Let’s take a look at this decorator.
2.2.1 Adding a Task
App._task_from_fun (fun, **options) is called when the task is added for each app during initialization.
The specific functions are:
- Judge the configuration of various parameters;
- Create tasks dynamically.
- Add the task to the _tasks task;
- Bind attributes to the instance using task’s bind method.
The code is as follows:
def _task_from_fun(self, fun, name=None, base=None, bind=False, **options) :
name = name or self.gen_task_name(fun.__name__, fun.__module__) # if a name is passed, use the moudle name form otherwise
base = base or self.Task Celery.app. Task: tasks
if name not in self._tasks: If the name of the task to be added is no longer in _tasks
run = fun if bind else staticmethod(fun) Bind if yes, use the method directly; otherwise, use static
task = type(fun.__name__, (base,), dict({
'app': self, Create Task instance dynamically
'name': name, # the name of the Task
'run': run, The task run method
'_decorated': True.# Decorate or not
'__doc__': fun.__doc__,
'__module__': fun.__module__,
'__header__': staticmethod(head_from_fun(fun, bound=bind)),
'__wrapped__': run}, **options))()
# for some reason __qualname__ cannot be set in type()
# so we have to set it here.
try:
task.__qualname__ = fun.__qualname__
except AttributeError:
pass
self._tasks[task.name] = task Add the task to the _tasks task
task.bind(self) Call the task bind method to bind the related attributes to the instance
add_autoretry_behaviour(task, **options)
else:
task = self._tasks[name]
return task
Copy the code
2.2.2 binding
The bind method is used to bind attributes to the instance, since it is not enough to know the task name or code; you also need to get the instance of the task at runtime.
@classmethod
def bind(cls, app) :
was_bound, cls.__bound__ = cls.__bound__, True
cls._app = app # Set the _app property of the class
conf = app.conf Get app configuration information
cls._exec_options = None # clear option cache
if cls.typing is None:
cls.typing = app.strict_typing
for attr_name, config_name in cls.from_config: Set the default values in the class
if getattr(cls, attr_name, None) is None: If the property is null
setattr(cls, attr_name, conf[config_name]) Use the default values in the app configuration
# decorate with annotations from config.
if not was_bound:
cls.annotate()
from celery.utils.threads import LocalStack
cls.request_stack = LocalStack() Use thread stacks to store data
# PeriodicTask uses this to add itself to the PeriodicTask schedule.
cls.on_bound(app)
return app
Copy the code
2.3 summary
At this point on the client side (user side), the Celery application has been started and a task instance has been generated with attributes bound to the instance.
0 x03 it class
When the client calls apply_async, app.send_task is called to send the task, and amQp is used, so let’s talk about the AMqp class first.
3.1 to generate
In send_task there is the following code, which is:
def send_task(self, ....) :
"""Send task by name. """
parent = have_parent = None
amqp = self.amqp # generated at this time
Copy the code
Self is Celery application itself, let’s print out the details and we can see what the Celery application looks like from below.
self = {Celery} <Celery myTest at 0x1eeb5590488>
AsyncResult = {type} <class 'celery.result.AsyncResult'>
Beat = {type} <class 'celery.apps.beat.Beat'>
GroupResult = {type} <class 'celery.result.GroupResult'>
Pickler = {type} <class 'celery.app.utils.AppPickler'>
ResultSet = {type} <class 'celery.result.ResultSet'>
Task = {type} <class 'celery.app.task.Task'>
WorkController = {type} <class 'celery.worker.worker.WorkController'>
Worker = {type} <class 'celery.apps.worker.Worker'>
amqp = {AMQP} <celery.app.amqp.AMQP object at 0x000001EEB5884188>
amqp_cls = {str} 'celery.app.amqp:AMQP' backend = {DisabledBackend}
clock = {LamportClock} 0 control = {Control}
events = {Events}
loader = {AppLoader}
main = {str} '
myTest' pool = {ConnectionPool}
producer_pool = {ProducerPool}
registry_cls = {type}
celery.app.registry.TaskRegistry'> tasks = {TaskRegistry: 10} {'myTest.add': <@task: myTest.add of myTest at 0x1eeb5590488>, 'celery.accumulate': <@task: celery.accumulate of myTest at 0x1eeb5590488>, 'celery.chord_unlock': <@task: celery.chord_unlock of myTest at 0x1eeb5590488>, 'celery.chunks': <@task: celery.chunks of myTest at 0x1eeb5590488>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of myTest at 0x1eeb5590488>, 'celery.group': <@task: celery.group of myTest at 0x1eeb5590488>, 'celery.map': <@task: celery.map of myTest at 0x1eeb5590488>, 'celery.chain': <@task: celery.chain of myTest at 0x1eeb5590488>, 'celery.starmap': <@task: celery.starmap of myTest at 0x1eeb5590488>, 'celery.chord': <@task: celery.chord of myTest at 0x1eeb5590488>}
Copy the code
The stack is:
amqp, base.py:1205
__get__, objects.py:43
send_task, base.py:705
apply_async, task.py:565
<module>, myclient.py:4
Copy the code
Why does an assignment generate an AMQP? Because it is decorated by cached_property.
Functions decorated with cached_property become properties of an object. The first time the object refers to the property, the function is called, and the second time the object refers to the property, it is taken directly from the dictionary. Caches the return value of the get method on first call.
@cached_property
def amqp(self) :
"""AMQP related functionality: :class:`~@amqp`."""
return instantiate(self.amqp_cls, app=self)
Copy the code
3.2 define
The AMQP class is another encapsulation of the AMQP protocol implementation, in this case, the KOMBU class.
class AMQP:
"""App AMQP API: app.amqp."""
Connection = Connection
Consumer = Consumer
Producer = Producer
#: compat alias to Connection
BrokerConnection = Connection
queues_cls = Queues
#: Cached and prepared routing table.
_rtable = None
#: Underlying producer pool instance automatically
#: set by the :attr:`producer_pool`.
_producer_pool = None
# Exchange class/function used when defining automatic queues.
# For example, you can use ``autoexchange = lambda n: None`` to use the
# AMQP default exchange: a shortcut to bypass routing
# and instead send directly to the queue named in the routing key.
autoexchange = None
Copy the code
Let’s print it out, and we can see what amQP looks like.
amqp = {AMQP}
BrokerConnection = {type} <class 'kombu.connection.Connection'>
Connection = {type} <class 'kombu.connection.Connection'>
Consumer = {type} <class 'kombu.messaging.Consumer'>
Producer = {type} <class 'kombu.messaging.Producer'>
app = {Celery} <Celery myTest at 0x252bd2903c8>
argsrepr_maxsize = {int} 1024
autoexchange = {NoneType} None
default_exchange = {Exchange} Exchange celery(direct)
default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
kwargsrepr_maxsize = {int} 1024
producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
queues_cls = {type} <class 'celery.app.amqp.Queues'>
router = {Router} <celery.app.routes.Router object at 0x00000252BDC6B248>
routes = {tuple: 0} ()
task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x00000252BDC74148> >,2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x00000252BDC74148>>}
utc = {bool} True
_event_dispatcher = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x00000252BE750348>
_producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
_rtable = {tuple: 0} ()
Copy the code
The specific logic is as follows:
+---------+
| Celery | +----------------------------+
| | | celery.app.amqp.AMQP |
| | | |
| | | |
| | | BrokerConnection +-----> kombu.connection.Connection
| | | |
| amqp+----->+ Connection +-----> kombu.connection.Connection
| | | |
+---------+ | Consumer +-----> kombu.messaging.Consumer
| |
| Producer +-----> kombu.messaging.Producer
| |
| producer_pool +-----> kombu.pools.ProducerPool
| |
| queues +-----> celery.app.amqp.Queues
| |
| router +-----> celery.app.routes.Router
+----------------------------+
Copy the code
0 x04 send Task
Let’s look at how the client sends tasks.
from myTest import add
re = add.apply_async((2.17))
Copy the code
To summarize the logic:
- The Producer initialization process completes the content used for the connection, such as calling the self.connect method to connect the carrier to the predetermined Transport class and initializing Chanel, self.chanel = self.Connection;
- Call Message to encapsulate the Message;
- Exchange converts routing_key to queue;
- Call AMQP to send a message;
- Channel is responsible for releasing the final message;
Let’s go into more detail.
4.1 apply_async in task
Two things are important here:
- If task_always_eager is used, a kombu.producer is produced.
- Otherwise, call AMQP to send the task (we’ll focus on this);
The code for the reduced version is as follows:
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options) :
"""Apply tasks asynchronously by sending a message. """
preopts = self._get_exec_options()
options = dict(preopts, **options) if options else preopts
app = self._get_app()
if app.conf.task_always_eager:
# for producer
with app.producer_or_acquire(producer) as eager_producer:
serializer = options.get('serializer')
body = args, kwargs
content_type, content_encoding, data = serialization.dumps(
body, serializer,
)
args, kwargs = serialization.loads(
data, content_type, content_encoding,
accept=[content_type]
)
with denied_join_result():
return self.apply(args, kwargs, task_id=task_id or uuid(),
link=link, link_error=link_error, **options)
else:
return app.send_task( # call here
self.name, args, kwargs, task_id=task_id, producer=producer,
link=link, link_error=link_error, result_cls=self.AsyncResult,
shadow=shadow, task_type=self,
**options
)
Copy the code
Here it is:
1 apply_async +-------------------+
| |
User +---------------------> | task: myTest.add |
| |
+-------------------+
Copy the code
4.2 send_task
Call amQP to send a task:
- Obtain amQP instance;
- Set the task ID. If no task ID is passed in, the task ID is generated.
- Generate route values, if not, use the amQP router;
- Generate route information;
- Generate task information;
- Generates a producer if there is a connection;
- Send a task message;
- Generate asynchronous task instances;
- Return a result;
Details are as follows:
def send_task(self, name, ...) :
"""Send task by name. """
parent = have_parent = None
amqp = self.amqp Get the amQP instance
task_id = task_id or uuid() If no task id is passed in, the task ID will be generated
producer = producer or publisher # XXX compat # generate this
router = router or amqp.router # route value, if not, use amQP router
options = router.route(
options, route_name or name, args, kwargs, task_type) Generate route information
message = amqp.create_task_message( Generate task information
task_id, name, args, kwargs, countdown, eta, group_id, group_index,
expires, retries, chord,
maybe_list(link), maybe_list(link_error),
reply_to or self.thread_oid, time_limit, soft_time_limit,
self.conf.task_send_sent_event,
root_id, parent_id, shadow, chain,
argsrepr=options.get('argsrepr'),
kwargsrepr=options.get('kwargsrepr'),if connection:
producer = amqp.Producer(connection) Generate a producer if there is a connection
with self.producer_or_acquire(producer) as P:
with P.connection._reraise_as_library_errors():
self.backend.on_task_call(P, task_id)
amqp.send_task_message(P, name, message, **options) Send a task message
result = (result_cls or self.AsyncResult)(task_id) Create an asynchronous task instance
if add_to_parent:
if not have_parent:
parent, have_parent = self.current_worker_task, True
if parent:
parent.add_trail(result)
return result # return result
Copy the code
Here it is:
1 apply_async +-------------------+
| |
User +---------------------> | task: myTest.add |
| |
+--------+----------+
|
|
2 send_task |
|
v
+------+--------+
| Celery myTest |
| |
+------+--------+
|
|
3 send_task_message |
|
v
+-------+---------+
| amqp |
| |
| |
+-----------------+
Copy the code
4.3 Generating Message Content
As_task_v2 generates the message content in detail. You can see that if you implement a message, you need to use several major parts:
- Headers: Task name, Task ID, Expires, etc.
- Message type and encoding: content-encoding, content-Type;
- Celery parameters: these are Celery specific, used to distinguish between different queues, e.g. exchanges, routing_key etc.
- Body: is the message body;
The following is an example of the final message:
{
"body": "W1syLCA4XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d"."content-encoding": "utf-8"."content-type": "application/json"."headers": {
"lang": "py"."task": "myTest.add"."id": "243aac4a-361b-4408-9e0c-856e2655b7b5"."shadow": null."eta": null."expires": null."group": null."group_index": null."retries": 0."timelimit": [null.null]."root_id": "243aac4a-361b-4408-9e0c-856e2655b7b5"."parent_id": null."argsrepr": "(2, 8)"."kwargsrepr": "{}"."origin": "gen33652@DESKTOP-0GO3RPO"
},
"properties": {
"correlation_id": "243aac4a-361b-4408-9e0c-856e2655b7b5"."reply_to": "b34fcf3d-da9a-3717-a76f-44b6a6362da1"."delivery_mode": 2."delivery_info": {
"exchange": ""."routing_key": "celery"
},
"priority": 0."body_encoding": "base64"."delivery_tag": "fa1bc9c8-3709-4c02-9543-8d0fe3cf4e6c"}}Copy the code
The specific code is as follows. Sent_event is required for subsequent sending and is not reflected in the specific message content:
def as_task_v2(self, task_id, name, args=None, kwargs=None.) :.return task_message(
headers={
'lang': 'py'.'task': name,
'id': task_id,
'shadow': shadow,
'eta': eta,
'expires': expires,
'group': group_id,
'group_index': group_index,
'retries': retries,
'timelimit': [time_limit, soft_time_limit],
'root_id': root_id,
'parent_id': parent_id,
'argsrepr': argsrepr,
'kwargsrepr': kwargsrepr,
'origin': origin or anon_nodename()
},
properties={
'correlation_id': task_id,
'reply_to': reply_to or ' ',
},
body=(
args, kwargs, {
'callbacks': callbacks,
'errbacks': errbacks,
'chain': chain,
'chord': chord,
},
),
sent_event={
'uuid': task_id,
'root_id': root_id,
'parent_id': parent_id,
'name': name,
'args': argsrepr,
'kwargs': kwargsrepr,
'retries': retries,
'eta': eta,
'expires': expires,
} if create_sent_event else None.)Copy the code
4.4 send_task_message in it
Amqp. Send_task_message (P, name, message, **options) is used to send tasks to amqP.
The method mainly assembles the parameters of the task to be sent, such as connection, queue, exchange, routing_key, etc., and invokes the publish task of producer.
The basic formula is:
- The queue is available;
- Get delivery_mode;
- For exchange;
- Obtain retry policy, etc.
- Calling producer to send messages;
def send_task_message(producer, name, message,
exchange=None, routing_key=None, queue=None,
event_dispatcher=None,
retry=None, retry_policy=None,
serializer=None, delivery_mode=None,
compression=None, declare=None,
headers=None, exchange_type=None, **kwargs) :
Get queue, get delivery_mode, get Exchange, get retry policy, etc
if before_receivers:
send_before_publish(
sender=name, body=body,
exchange=exchange, routing_key=routing_key,
declare=declare, headers=headers2,
properties=properties, retry_policy=retry_policy,
)
ret = producer.publish(
body,
exchange=exchange,
routing_key=routing_key,
serializer=serializer or default_serializer,
compression=compression or default_compressor,
retry=retry, retry_policy=_rp,
delivery_mode=delivery_mode, declare=declare,
headers=headers2,
**properties
)
if after_receivers:
send_after_publish(sender=name, body=body, headers=headers2,
exchange=exchange, routing_key=routing_key)
.....
if sent_event: # here we handle sent_event
evd = event_dispatcher or default_evd
exname = exchange
if isinstance(exname, Exchange):
exname = exname.name
sent_event.update({
'queue': qname,
'exchange': exname,
'routing_key': routing_key,
})
evd.publish('task-sent', sent_event,
producer, retry=retry, retry_policy=retry_policy)
return ret
return send_task_message
Copy the code
The stack is:
send_task_message, amqp.py:473
send_task, base.py:749
apply_async, task.py:565
<module>, myclient.py:4
Copy the code
At this point, the variable is:
qname = {str} 'celery'
queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
ContentDisallowed = {type} <class 'kombu.exceptions.ContentDisallowed'>
alias = {NoneType} None
attrs = {tuple: 18} (('name'.None), ('exchange'.None), ('routing_key'.None), ('queue_arguments'.None), ('binding_arguments'.None), ('consumer_arguments'.None), ('durable', <class 'bool'>),'exclusive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('no_ack'.None), ('alias'.None), ('bindings', <class 'list'>), ('no_declare', <class 'bool'>), ('expires', <class 'float'>), ('message_ttl', <class 'float'>), ('max_length', <class 'int'>), ('max_length_bytes', <class 'int'>), ('max_priority', <class 'int'>))
auto_delete = {bool} False
binding_arguments = {NoneType} None
bindings = {set: 0} set()
can_cache_declaration = {bool} True
channel = {str} 'Traceback (most recent call last):\n File "C:\\Program Files\\JetBrains\\PyCharm Community Edition 2020.2.2 - ce \ \ \ \ plugins \ \ python helpers \ \ pydev \ \ _pydevd_bundle \ \ pydevd_resolver py, line 178, "" in _getPyDictionary\n attr = getattr(var, n)\n File "C:\\User consumer_arguments = {NoneType} None durable = {bool} True exchange = {Exchange} Exchange celery(direct) exclusive = {bool} False expires = {NoneType} None is_bound = {bool} False max_length = {NoneType} None max_length_bytes = {NoneType} None max_priority = {NoneType} None message_ttl = {NoneType} None name = {str} 'celery' no_ack = {bool} False no_declare = {NoneType} None on_declared = {NoneType} None queue_arguments = {NoneType} None routing_key = {str} 'celery' _channel = {NoneType} None _is_bound = {bool} False queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
Copy the code
The logic is as follows:
1 apply_async +-------------------+
| |
User +---------------------> | task: myTest.add |
| |
+--------+----------+
|
|
2 send_task |
|
v
+------+--------+
| Celery myTest |
| |
+------+--------+
|
|
3 send_task_message |
|
v
+-------+---------+
| amqp |
+-------+---------+
|
|
4 publish |
|
v
+----+------+
| producer |
| |
+-----------+
Copy the code
4.5 the publish in producer
In Produer, a channel is called to send information.
def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare) :
channel = self.channel
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
if declare:
maybe_declare = self.maybe_declare
[maybe_declare(entity) for entity in declare]
# handle autogenerated queue names for reply_to
reply_to = properties.get('reply_to')
if isinstance(reply_to, Queue):
properties['reply_to'] = reply_to.name
return channel.basic_publish( # send a message
message,
exchange=exchange, routing_key=routing_key,
mandatory=mandatory, immediate=immediate,
)
Copy the code
Variables are as follows:
body = {str} '[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
compression = {NoneType} None
content_encoding = {str} 'utf-8'
content_type = {str} 'application/json'
declare = {list: 1} [<unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>]
delivery_mode = {int} 2
exchange = {str} ' '
exchange_name = {str} ' '
expiration = {NoneType} None
headers = {dict: 15} {'lang': 'py'.'task': 'myTest.add'.'id': 'af0e4c14-a618-41b4-9340-1479cb7cde4f'.'shadow': None.'eta': None.'expires': None.'group': None.'group_index': None.'retries': 0.'timelimit': [None.None].'root_id': 'af0e4c14-a618-41b4-9340-1479cb7cde4f'.'parent_id': None.'argsrepr': '(2, 8)'.'kwargsrepr': '{}'.'origin': 'gen11468@DESKTOP-0GO3RPO'}
immediate = {bool} False
mandatory = {bool} False
priority = {int} 0
properties = {dict: 3} {'correlation_id': 'af0e4c14-a618-41b4-9340-1479cb7cde4f'.'reply_to': '2c938063-64b8-35f5-ac9f-a1c0915b6f71'.'delivery_mode': 2}
retry = {bool} True
retry_policy = {dict: 4} {'max_retries': 3.'interval_start': 0.'interval_max': 1.'interval_step': 0.2}
routing_key = {str} 'celery'
self = {Producer} <Producer: <promise: 0x1eeb62c44c8>>
serializer = {str} 'json'
Copy the code
The logic is as follows:
1 apply_async +-------------------+
| |
User +---------------------> | task: myTest.add |
| |
+--------+----------+
|
2 send_task |
|
v
+------+--------+
| Celery myTest |
| |
+------+--------+
|
3 send_task_message |
|
v
+-------+---------+
| amqp |
+-------+---------+
|
4 publish |
|
v
+----+------+
| producer |
| |
+----+------+
|
|
5 basic_publish |
v
+----+------+
| channel |
| |
+-----------+
Copy the code
At this point a task is sent out, waiting for the consumer to consume the task.
4.6 content of redis
Once sent, the task is stored in a redis queue. The results in Redis are:
127.0. 01.:6379> keys *
1) "_kombu.binding.reply.testMailbox.pidbox"
2) "_kombu.binding.testMailbox.pidbox"
3) "celery"
4) "_kombu.binding.celeryev"
5) "_kombu.binding.celery"
6) "_kombu.binding.reply.celery.pidbox"
127.0. 01.:6379> lrange celery 0 -1
1) "{\"body\": \"W1syLCA4XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"myTest.add\", \"id\": \"243aac4a-361b-4408-9e0c-856e2655b7b5\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"243aac4a-361b-4408-9e0c-856e2655b7b5\", \"parent_id\": null, \"argsrepr\": \"(2, 8)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen33652@DESKTOP-0GO3RPO\"}, \"properties\": {\"correlation_id\": \"243aac4a-361b-4408-9e0c-856e2655b7b5\", \"reply_to\": \"b34fcf3d-da9a-3717-a76f-44b6a6362da1\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"fa1bc9c8-3709-4c02-9543-8d0fe3cf4e6c\"}}"
Copy the code
4.6.1 delivery_tag role
As you can see, in the final message, there is a delivery_tag variable, which is a special note here.
Think of delivery_tag as the unique identifier for a message in Redis, in UUID format.
Specific examples are as follows:
“Delivery_tag” : “our fleet fa1bc9c8-3709-4-9543-8 d0fe3cf4e6c”.
QoS then uses Delivery_tag for various processing, such as ACK, snack.
with self.pipe_or_acquire() as pipe:
pipe.zadd(self.unacked_index_key, *zadd_args) \
.hset(self.unacked_key, delivery_tag,
dumps([message._raw, EX, RK])) \
.execute()
super().append(message, delivery_tag)
Copy the code
4.6.2 When will delivery_tag be generated
What we care about is when the delivery_tag is generated when the message is sent.
It turns out that the message was further enhanced in the Channel’s _next_delivery_tag function before it was sent.
def _next_delivery_tag(self) :
return uuid()
Copy the code
The specific stack is as follows:
_next_delivery_tag, base.py:595
_inplace_augment_message, base.py:614
basic_publish, base.py:599
_publish, messaging.py:200
_ensured, connection.py:525
publish, messaging.py:178
send_task_message, amqp.py:532
send_task, base.py:749
apply_async, task.py:565
<module>, myclient.py:4
Copy the code
By now, the process of sending tasks to clients has finished, if you are interested, you can look at the dynamic process of consuming tasks when tasks are received from the server.
0xEE Personal information
★★★★ Thoughts on life and technology ★★★★★
Wechat official account: Rosie’s Thoughts
If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.
0 XFF reference
Celery source analysis – Tasks initialize and send tasks
Celery = tasks with tasks
Tasks: queue up tasks