0 x00 the

Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling. In the previous article, we introduced the Celery multi-threaded model. This article describes how the child process processes messages.

Through this article, you can sort out the following processes:

  • How the parent process sends messages to the child process;
  • How the child process receives messages from the parent process;
  • How the child process parses the message step by step to strip away, layer by layer, the information needed to run a task;
  • How to run the task after the sub-process gets the task information;
  • Why Celery should have various complex and cumbersome packages?

0 x01 source

Let’s start with a review. Celery work = apply_async Pool = Pool = apply_async Pool = Pool = Pool = apply_async Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool = Pool

def apply_async(self, func, args=(), kwds={},...) :           
        if self.threads:
            self._taskqueue.put(([(TASK, (result._job, None,
                                func, args, kwds))], None))
        else:
            self._quick_put((TASK, (result._job, None, func, args, kwds)))
        return result
Copy the code

Then, as seen in billiard/pool.py, the pool passes the message to the TaskHandler in the form of self._taskqueue, which calls the child process.

class Pool(object) :
    ''' Class which supports an async version of applying functions to arguments. '''
    Worker = Worker
    Supervisor = Supervisor
    TaskHandler = TaskHandler
    TimeoutHandler = TimeoutHandler
    ResultHandler = ResultHandler

    def __init__(self, processes=None, initializer=None, initargs=(),...) :

        self._task_handler = self.TaskHandler(self._taskqueue,
                                              self._quick_put,
                                              self._outqueue,
                                              self._pool,
                                              self._cache)
        if threads:
            self._task_handler.start()
Copy the code

The logic is shown in the legend above:

+ Consumer | message | v strategy +------------------------------------+ +------------+------+ | strategies | | on_task_received | <--------+ | | | | |[myTest.add : task_message_handler] | +------------+------+ +------------------------------------+ | | +------------------------------------------------------------------------------------+ strategy | | | v Request [myTest.add] +------------+-------------+ +---------------------+ | task_message_handler | <-------------------+ | create_request_cls | | | | | +------------+-------------+ +---------------------+ | _process_task_sem | +------------------------------------------------------------------------------------+ Worker | req[{Request} myTest.add] v +--------+-----------+ | WorkController | | | | pool +-------------------------+ +--------+-----------+ | | | | apply_async v +-----------+----------+ +---+-------------------+ |{Request} myTest.add | +---------------> | TaskPool | +----------------------+ +----+------------------+ myTest.add | | +--------------------------------------------------------------------------------------+ | v +----+------------------+ |  billiard.pool.Pool | +-------+---------------+ | | Pool +---------------------------+ | | TaskHandler | | | | | self._taskqueue.put | _taskqueue | <---------------+ | | +------------+--------------+ | | put(task) | +--------------------------------------------------------------------------------------+ | Sub process | v self._inqueueCopy the code

The mobile phone is as follows:

So we follow the TaskQueue to the TaskHandler.

0x02 Parent process TaskHandler

This section describes how a parent process passes a task message to a child process.

This is still the parent process. The code location is: \billiard\pool.py. The specific stack is:

_send_bytes, connection.py:314
send, connection.py:233
body, pool.py:596
run, pool.py:504
_bootstrap_inner, threading.py:926
_bootstrap, threading.py:890
Copy the code

Variables are as follows:

self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)>
 additional_info = {PyDBAdditionalThreadInfo} State:2 Stop:None Cmd: 107 Kill:False
 cache = {dict: 1} {0: <%s: 0 ack:False ready:False>}
 daemon = {bool} True
 name = {str} 'Thread-16'
 outqueue = {SimpleQueue} <billiard.queues.SimpleQueue object at 0x000001E2C07DD6C8>
 pool = {list: 8} [<SpawnProcess(SpawnPoolWorker-1, started daemon)>, <SpawnProcess(SpawnPoolWorker-2, started daemon)>, <SpawnProcess(SpawnPoolWorker-3, started daemon)>, <SpawnProcess(SpawnPoolWorker-4, started daemon)>, <SpawnProcess(SpawnPoolWorker-5, started daemon)>, <SpawnProcess(SpawnPoolWorker-6, started daemon)>, <SpawnProcess(SpawnPoolWorker-7, started daemon)>, <SpawnProcess(SpawnPoolWorker-8, started daemon)>]
 taskqueue = {Queue} <queue.Queue object at 0x000001E2C07DD208>
  _args = {tuple: 0} ()
  _children = {WeakKeyDictionary: 0} <WeakKeyDictionary at 0x1e2c0883448>
  _daemonic = {bool} True
  _kwargs = {dict: 0} {}
  _name = {str} 'Thread-16'
  _parent = {_MainThread} <_MainThread(MainThread, started 13408)>
  _pid = {NoneType} None
  _start_called = {bool} True
  _started = {Event} <threading.Event object at 0x000001E2C0883D88>
  _state = {int} 0
  _stderr = {LoggingProxy} <celery.utils.log.LoggingProxy object at 0x000001E2C07DD188>
  _target = {NoneType} None
  _tstate_lock = {lock} <locked _thread.lock object at 0x000001E2C081FDB0>
  _was_started = {bool} True
Copy the code

2.1 Sending Messages

When the parent process receives the task message, it calls PUT (Task) to send the message to the pipe between the parent and child processes.

Notice, because the previous assignment code was:

self._taskqueue = Queue()

def _setup_queues(self) :
        self._inqueue = Queue()
        self._outqueue = Queue()
        self._quick_put = self._inqueue.put
        self._quick_get = self._outqueue.get
Copy the code

That is, within a TaskHandler, if it receives a message, it sends it to its child process via the self._inqueue.put pipe function. Self._taskqueue is just an intermediate variable medium.

So the variables are as follows:

put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001E2C07DD2C8>>

self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)>

task = {tuple: 2} 
 0 = {int} 2
 1 = {tuple: 5} (0.None, <function _trace_task_ret at 0x000001E2BFCA3438>,'myTest.add'.'dee72291-5614-4106-a7bf-007023286e9e', {'lang': 'py'.'task': 'myTest.add'.'id': 'dee72291-5614-4106-a7bf-007023286e9e'.'shadow': None.'eta': None.'expires': None.'group': None.'group_index': None.'retries': 0.'timelimit': [None.None].'root_id': 'dee72291-5614-4106-a7bf-007023286e9e'.'parent_id': None.'argsrepr': '(2, 8)'.'kwargsrepr': '{}'.'origin': 'gen17456@DESKTOP-0GO3RPO'.'reply_to': '21660796-c7e7-3736-9d42-e1be6ff7eaa8'.'correlation_id': 'dee72291-5614-4106-a7bf-007023286e9e'.'hostname': 'celery@DESKTOP-0GO3RPO'.'delivery_info': {'exchange': ' '.'routing_key': 'celery'.'priority': 0.'redelivered': None}, 'args': [2.8].'kwargs': {}}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'.'application/json'.'utf-8'), {})
 __len__ = {int} 2
    
taskqueue = {Queue} <queue.Queue object at 0x000001E2C07DD208>
Copy the code

Send a message to the pipe and notify the Result handler and other workers:

class TaskHandler(PoolThread) :

    def __init__(self, taskqueue, put, outqueue, pool, cache) :
        self.taskqueue = taskqueue
        self.put = put
        self.outqueue = outqueue
        self.pool = pool
        self.cache = cache
        super(TaskHandler, self).__init__()

    def body(self) :
        cache = self.cache
        taskqueue = self.taskqueue
        put = self.put

        for taskseq, set_length in iter(taskqueue.get, None):
            task = None
            i = -1
            try:
                for i, task in enumerate(taskseq):
                    try:
                        put(task)

                break


        self.tell_others()
Copy the code

2.2 Notice others

Tell_others notifies the Result handler and other workers.

def tell_others(self) :
    outqueue = self.outqueue
    put = self.put
    pool = self.pool

    try:
        # tell result handler to finish when cache is empty
        outqueue.put(None)

        # tell workers there is no more work
        for p in pool:
            put(None)
Copy the code

0x03 Child process Worker

This section describes how the Worker child process accepts and executes tasks.

Now that the task message has been piped to the child process, execution now comes to the child process, noting that self is billiard.pool.worker at this point.

3.1 Subprocess loop

In worker, the specific logic of message loop (multiple parsing messages) is:

  • Call wait_for_job to wait for the parent process to write messages to the pipe;
  • After getting the user message req, parse it out:type_, args = req;
  • If you need to send an ACK, send it;
  • For the parsed ARgs, parse again:job, i, fun, args, kwargs = args_, get the job, the function that the child needs to execute, the parameters of the function, and so on;
  • If wait_FOR_SYN is required, process it;
  • Use fun to call user-defined functions indirectlyresult = (True, prepare_result( fun(*args, **kwargs)))And returns result. It’s important to note that fun here is_trace_task_ret, user – defined function by_trace_task_retInternal call;
  • Do subsequent processing, such as sending READY to the parent process;

The code is as follows:

def workloop(self, debug=debug, now=monotonic, pid=None) :
    pid = pid or os.getpid()
    put = self.outq.put
    inqW_fd = self.inqW_fd
    synqW_fd = self.synqW_fd
    maxtasks = self.maxtasks
    prepare_result = self.prepare_result

    wait_for_job = self.wait_for_job
    _wait_for_syn = self.wait_for_syn

    def wait_for_syn(jid) :
        i = 0
        while 1:
            if i > 60:
                error('!!!!!! WAIT FOR ACK TIMEOUT: job:%r fd:%r!!! ',
                      jid, self.synq._reader.fileno(), exc_info=1)
            req = _wait_for_syn()
            if req:
                type_, args = req Parse the message req from the user
                if type_ == NACK:
                    return False
                assert type_ == ACK
                return True
            i += 1

    completed = 0
    try:
        while maxtasks is None or (maxtasks and completed < maxtasks):
            req = wait_for_job()
            if req:
                type_, args_ = req
                assert type_ == TASK
                job, i, fun, args, kwargs = args_ # parse again to get the variable. Where fun is' _trace_task_ret ', the user-defined function is called internally by '_trace_task_ret'
                put((ACK, (job, i, now(), pid, synqW_fd)))
                if _wait_for_syn:
                    confirm = wait_for_syn(job)
                    if not confirm:
                        continue  # received NACK

                    result = (True, prepare_result(fun(*args, **kwargs)))
 
                    put((READY, (job, i, result, inqW_fd)))

                completed += 1
                if max_memory_per_child > 0:
                    used_kb = mem_rss()
                    if used_kb > 0 and used_kb > max_memory_per_child:
                        warning(MAXMEM_USED_FMT.format(
                            used_kb, max_memory_per_child))
                        return EX_RECYCLE

        if maxtasks:
            return EX_RECYCLE if completed == maxtasks else EX_FAILURE
        return EX_OK
    finally:
        self._ensure_messages_consumed(completed=completed)
Copy the code

The req variable is the message piped by the parent process, which is initially parsed as args_ :

prepare_result = {method} <bound method Worker.prepare_result of <billiard.pool.Worker object at 0x000001BFAE5AE308>>
    
put = {method} <bound method _SimpleQueue.put of <billiard.queues.SimpleQueue object at 0x000001BFAE1BE7C8>>
    
type_ = 2 // TASK = 2 is defined in pool.py
  
req = {tuple: 2} (2, (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>,'myTest.add'.'2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py'.'task': 'myTest.add'.'id': '2c6d431f-a86a-4972-886b-472662401d20'.'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0.'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20'.'parent_id': None, 'argsrepr': '(2, 8)'.'kwargsrepr': '{}'.'origin': 'gen14656@DESKTOP-0GO3RPO'.'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad'.'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20'.'hostname': 'DESKTOP-0GO3RPO'.'delivery_info': {'exchange': ' '.'routing_key': 'celery'.'priority': 0.'redelivered': None}, 'args': [2.8].'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'.'application/json'.'utf-8'), {}))

self = {Worker} <billiard.pool.Worker object at 0x000001BFAE5AE308>
    
kwargs = {dict: 0} {}

args_ = (6, None, <function _trace_task_ret at 0x000001BFAE53EA68>,'myTest.add'.'2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py'.'task': 'myTest.add'.'id': '2c6d431f-a86a-4972-886b-472662401d20'.'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0.'timelimit': [None, None], 'root_id': '2c6d431f-a86a-4972-886b-472662401d20'.'parent_id': None, 'argsrepr': '(2, 8)'.'kwargsrepr': '{}'.'origin': 'gen14656@DESKTOP-0GO3RPO'.'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad'.'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20'.'hostname': 'DESKTOP-0GO3RPO'.'delivery_info': {'exchange': ' '.'routing_key': 'celery'.'priority': 0.'redelivered': None}, 'args': [2.8].'kwargs': {}, 'is_eager': False, 'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'.'application/json'.'utf-8'), {}))
Copy the code

For the previous logic diagram, we extend the logic down as follows:

                                                               +
                                                               |
                                                               |
                                                               v
                                                          +----+------------------+
                                                          | billiard.pool.Pool    |
                                                          +-------+---------------+
                                                                  |
                                                                  |
 Pool              +---------------------------+                  |
                   | TaskHandler               |                  |
                   |                           |                  |  self._taskqueue.put
                   |              _taskqueue   |  <---------------+
                   |                           |
                   +------------+--------------+
                                |
                                |  put(task)
                                |
+--------------------------------------------------------------------------------------+
                                |
 billiard.pool.Worker           |  get                             Sub process
                                v
                     +----------+-----------------------------+
                     |  workloop                              |
                     |                                        |
                     |                                        |
                     |          wait_for_job                  |
                     |                                        |
                     +----------------------------------------+
Copy the code

The mobile phone is as follows:

3.2 Getting the parent process message

The wait_for_job function ends up calling _make_recv_method, which is handled using pipe Conn’s read function.

What is read is the message REQ passed from the parent process, as described above.

Review the parent process’s write message content:

put = {method} <bound method _ConnectionBase.send of <billiard.connection.PipeConnection object at 0x000001E2C07DD2C8>>

self = {TaskHandler} <TaskHandler(Thread-16, started daemon 14980)>

task = {tuple: 2} 
 0 = {int} 2
 1 = {tuple: 5} (0.None, <function _trace_task_ret at 0x000001E2BFCA3438>,'myTest.add'.'dee72291-5614-4106-a7bf-007023286e9e', {'lang': 'py'.'task': 'myTest.add'.'id': 'dee72291-5614-4106-a7bf-007023286e9e'.'shadow': None.'eta': None.'expires': None.'group': None.'group_index': None.'retries': 0.'timelimit': [None.None].'root_id': 'dee72291-5614-4106-a7bf-007023286e9e'.'parent_id': None.'argsrepr': '(2, 8)'.'kwargsrepr': '{}'.'origin': 'gen17456@DESKTOP-0GO3RPO'.'reply_to': '21660796-c7e7-3736-9d42-e1be6ff7eaa8'.'correlation_id': 'dee72291-5614-4106-a7bf-007023286e9e'.'hostname': 'celery@DESKTOP-0GO3RPO'.'delivery_info': {'exchange': ' '.'routing_key': 'celery'.'priority': 0.'redelivered': None}, 'args': [2.8].'kwargs': {}}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'.'application/json'.'utf-8'), {})
 __len__ = {int} 2
Copy the code

As you can see, what is written by the parent is read by the child. The child process reads the message via _make_recv_method, which is handled using the conn pipe’s read function.

This is the child process.

    def _make_recv_method(self, conn) :
        get = conn.get

        if hasattr(conn, '_reader'):
            _poll = conn._reader.poll
            if hasattr(conn, 'get_payload') and conn.get_payload:
                get_payload = conn.get_payload

                def _recv(timeout, loads=pickle_loads) :
                    return True, loads(get_payload())
            else:
                def _recv(timeout) :  # noqa
                    if _poll(timeout):
                        return True, get()
                    return False.None
        else:
            def _recv(timeout) :  # noqa
                try:
                    return True, get(timeout=timeout)
                except Queue.Empty:
                    return False.None
        return _recv

Copy the code

3.3 Parsing Messages

The child process reads the message and parses it. job, i, fun, args, kwargs = args_

It essentially parses the previous contents of args_.

args_ = (6.None, <function _trace_task_ret at 0x000001BFAE53EA68>,'myTest.add'.'2c6d431f-a86a-4972-886b-472662401d20', {'lang': 'py'.'task': 'myTest.add'.'id': '2c6d431f-a86a-4972-886b-472662401d20'.'shadow': None.'eta': None.'expires': None.'group': None.'group_index': None.'retries': 0.'timelimit': [None.None].'root_id': '2c6d431f-a86a-4972-886b-472662401d20'.'parent_id': None.'argsrepr': '(2, 8)'.'kwargsrepr': '{}'.'origin': 'gen14656@DESKTOP-0GO3RPO'.'reply_to': '3c9cc3a7-65d6-349b-ba66-399dc47b7cad'.'correlation_id': '2c6d431f-a86a-4972-886b-472662401d20'.'hostname': 'DESKTOP-0GO3RPO'.'delivery_info': {'exchange': ' '.'routing_key': 'celery'.'priority': 0.'redelivered': None}, 'args': [2.8].'kwargs': {}, 'is_eager': False.'callbacks': None.'errbacks': None.'chain': None.'chord': None}, b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'.'application/json'.'utf-8'), {}))
Copy the code

So we get:

job = {int} 6

i = {NoneType} None

fun = {function} <function _trace_task_ret at 0x000001BFAE53EA68>

kwargs = {dict: 0} {}

args = {tuple: 6} 
 0 = {str} 'myTest.add'
 1 = {str} '2c6d431f-a86a-4972-886b-472662401d20'
 2 = {dict: 26} {'lang': 'py'.'task': 'myTest.add'.'id': '2c6d431f-a86a-4972-886b-472662401d20'.'shadow': None.'eta': None.'expires': None.'group': None.'group_index': None.'retries': 0.'timelimit': [None.None].'root_id': '2c6d431f-a86a-4972-886b-472662401d20'.3 = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
 4 = {str} 'application/json'
 5 = {str} 'utf-8'
 __len__ = {int} 6
Copy the code

This way, the child process knows what function it needs to call (in this case, mytest.add) and what arguments it has (in this case, (2, 8)).

Let’s take a look at the message read parsing process:

  • The parent process writes to task
  • The child process reads req
  • The child process resolves req toType_, args_
  • The child process resolves args_ to: job, I, fun, args, kwargs. Fun here is_trace_task_ret, user – defined function by_trace_task_retInternal call.
  • In ARGS, the user – defined functions and their parameters are included;

3.3.1 Configuration of the callback function in the parent process

As mentioned, the first fun parsed is _trace_task_ret, and the user-defined function is called internally by _trace_task_ret.

We need to see where the callback function fun is configured in the parent process.

Task_message_handler uses the Rqeust class to use multiple processes when receiving tasks.

Celery /apps/worker.py = tasks /apps/worker.py = tasks /apps/worker.py = tasks /apps There are multiple Celery classes with the same name, which is confusing.

                         +
  Consumer               |
                 message |
                         v         strategy  +------------------------------------+
            +------------+------+            | strategies                         |
            | on_task_received  | <--------+ |                                    |
            |                   |            |[myTest.add : task_message_handler] |
            +------------+------+            +------------------------------------+
                         |
                         |
 +------------------------------------------------------------------------------------+
 strategy                |
                         |
                         |
                         v                Request [myTest.add]
            +------------+-------------+                       +---------------------+
            | task_message_handler     | <-------------------+ | create_request_cls  |
            |                          |                       |                     |
            +------------+-------------+                       +---------------------+
                         | _process_task_sem
                         |
+--------------------------------------------------------------------------------------+
 Worker                  | req[{Request} myTest.add]
                         v
                +--------+-----------+
                | WorkController     |
                |                    |       apply_async
                |            pool +-------------------------+
                +--------+-----------+                      |
                         |                                  |
                         |                                  v
             +-----------+----------+                   +---+-------+
             |{Request} myTest.add  | +---------------> | TaskPool  |
             +----------------------+                   +-----------+
                                        myTest.add
Copy the code

The mobile phone is as follows:

Apply_async is pool.apply_async.

In the Request execute_using_pool class, trace_task_ret is the argument to pool.apply_async, so trace_task_ret must be passed by the parent process.

class Request:
    """A request for task execution."""
    
   def execute_using_pool(self, pool, **kwargs) :
        """Used by the worker to send this task to the pool. """

        result = pool.apply_async(
            trace_task_ret, # Right here
            args=(self._type, task_id, self._request_dict, self._body,
                  self._content_type, self._content_encoding), This is where user-defined functions are included
            accept_callback=self.on_accepted,
            timeout_callback=self.on_timeout,
            callback=self.on_success,
            error_callback=self.on_failure,
            soft_timeout=soft_time_limit or task.soft_time_limit,
            timeout=time_limit or task.time_limit,
            correlation_id=task_id,
        )
        # cannot create weakref to None
        self._apply_result = maybe(ref, result)
        return result    
Copy the code

3.4 Calling a Function

Pool = Pool; Pool = Pool; _trace_task_ret, that is, _trace_task_ret is a unified wrapper around user functions. For pools, you can call _trace_task_ret. User functions are called in _trace_task_ret.

Why not just call the user function mytest.add? Instead, wrap another layer with _trace_task_ret? As you can see from the trace in the name, this is a compromise of extensibility, debugging, trace, and speed.

The core code is in two places:

3.3.1 Harvest Celery application

The first key is: get Celery app set in the child process in the following code:

app = app or current_app._get_current_object()
Copy the code

There is a question: Celery application is in the parent process, how does the child process get.

Although in some multi-process mechanisms the variables of the parent process are copied to the child process, this is not always the case so there must be a mechanism in which the father process sets the Celery application to the child process.

Please refer to the previous section for details on how the parent process configures the Celery application to its children and how the children get the application.

3.3.2 Obtaining a Task

The second focus is on how to obtain and implement registered tasks. The code is as follows:

R, I, T, Rstr = trace_task(app.tasks[name], uuid, args, kwargs, request, app=app)
Copy the code

Tasks, app. Tasks are pre-registered variables, i.e. all tasks in Celery, including built-in tasks and user tasks.

So app. Tasks [name] is used to get the corresponding task by the task name.

app.tasks = {TaskRegistry: 9} 
 NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x1bfae596d48>
 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x1bfae596d48>
 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x1bfae596d48>
 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x1bfae596d48>
 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x1bfae596d48>
 'celery.group' = {group} <@task: celery.group of myTest at 0x1bfae596d48>
 'celery.map' = {xmap} <@task: celery.map of myTest at 0x1bfae596d48>
 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x1bfae596d48>
 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x1bfae596d48>
Copy the code

The logic is as follows:

+ | | v +-------+---------------+ | billiard.pool.Pool | +-------+---------------+ | | +---------------------------+ | |  TaskHandler | | | | | self._taskqueue.put | _taskqueue | <-------------------------------+ | | +------------+--------------+ | | put(task) Pool | +-------------------------------------------------------------------------------------+ | | get billiard.pool.Worker Sub  process v +----------------+------+ +--------------------------------------------------+ | workloop | | app.tasks | | |  | | | wait_for_job | |'celery.chord' =  @task: celery.chord of myTest   |
|                       |           |'celery.chunks' =  @task: celery.chunks of myTest |
|     app.tasks[name] <-------------+'celery.group' =   @task: celery.group of myTest> |
|                       |           | ......                                           |
|                       |           |                                                  |
+-----------------------+           +--------------------------------------------------+
Copy the code

The mobile phone is as follows:

3.3.3 Invoking a Task

Now that we know which task to call, let’s see how.

3.3.3.1 Obtaining a Task

As you can see from above, the callback function is passed from the parent process, i.e

fun = {function} <function _trace_task_ret at 0x000001BFAE53EA68>
Copy the code

_trace_task_ret is defined in celery\app\trace.py.

Logic is:

  • Get Celery applied to app.

  • Extract message content and update Request, for example:

    • request = {dict: 26} 
       'lang' = {str} 'py'
       'task' = {str} 'myTest.add'
       'id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8'
       'shadow' = {NoneType} None
       'eta' = {NoneType} None
       'expires' = {NoneType} None
       'group' = {NoneType} None
       'group_index' = {NoneType} None
       'retries' = {int} 0
       'timelimit' = {list: 2} [None, None]
       'root_id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8'
       'parent_id' = {NoneType} None
       'argsrepr' = {str} '(2, 8)'
       'kwargsrepr' = {str} '{}'
       'origin' = {str} 'gen17060@DESKTOP-0GO3RPO'
       'reply_to' = {str} '5a520373-7712-3326-9ce8-325df14aa2ad'
       'correlation_id' = {str} 'a8928c1e-1e56-4502-9929-80a01b1bbfd8'
       'hostname' = {str} 'DESKTOP-0GO3RPO'
       'delivery_info' = {dict: 4} {'exchange': ' '.'routing_key': 'celery'.'priority': 0.'redelivered': None}
       'args' = {list: 2} [2.8]
       'kwargs' = {dict: 0} {}
       'is_eager' = {bool} False
       'callbacks' = {NoneType} None
       'errbacks' = {NoneType} None
       'chain' = {NoneType} None
       'chord' = {NoneType} None
       __len__ = {int} 26
      Copy the code
  • From the task name to the user task

  • Invoke the user Task using Request.

The specific code is as follows:

def trace_task(task, uuid, args, kwargs, request={}, **opts) :
    """Trace task execution."""
    try:
        if task.__trace__ is None:
            task.__trace__ = build_tracer(task.name, task, **opts)
        return task.__trace__(uuid, args, kwargs, request) Call the method written during the strategy update


def _trace_task_ret(name, uuid, request, body, content_type,
                    content_encoding, loads=loads_message, app=None,
                    **extra_request) :
    
    app = app or current_app._get_current_object()    Get Celery application
    
    embed = None
    if content_type:
        accept = prepare_accept_content(app.conf.accept_content)
        args, kwargs, embed = loads(
            body, content_type, content_encoding, accept=accept,
        )
    else:
        args, kwargs, embed = body
    
    request.update({
        'args': args, 'kwargs': kwargs,
        'hostname': hostname, 'is_eager': False,
    }, **embed or {})
    
    R, I, T, Rstr = trace_task(app.tasks[name],
                        uuid, args, kwargs, request, app=app)    Call trace_task to execute the task
    
    return (1, R, T) if I else (0, Rstr, T)

trace_task_ret = _trace_task_ret

Copy the code

At this point, the variable is:

accept = {set: 1} {'application/json'}
app = {Celery} <Celery myTest at 0x1bfae596d48>
args = {list: 2} [2.8]
body = {bytes: 81} b'[[2, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]'
content_encoding = {str} 'utf-8'
content_type = {str} 'application/json'
embed = {dict: 4} {'callbacks': None.'errbacks': None.'chain': None.'chord': None}
extra_request = {dict: 0} {}
kwargs = {dict: 0} {}
loads = {method} <bound method SerializerRegistry.loads of <kombu.serialization.SerializerRegistry object at 0x000001BFAE329408>>
name = {str} 'myTest.add'
request = {dict: 26} {'lang': 'py'.'task': 'myTest.add'.'id': '2c6d431f-a86a-4972-886b-472662401d20'.'shadow': None.'eta': None.'expires': None.'group': None.'group_index': None.'retries': 0.'timelimit': [None.None].'root_id': '2c6d431f-a86a-4972-886b-472662401d20',
uuid = {str} '2c6d431f-a86a-4972-886b-472662401d20'
Copy the code
3.3.3.2 Calling a Task

The trace_task is called, and is defined as follows:

def trace_task(task, uuid, args, kwargs, request=None, **opts) :
    """Trace task execution."""
    request = {} if not request else request
    try:
        if task.__trace__ is None:
            task.__trace__ = build_tracer(task.name, task, **opts)
        return task.__trace__(uuid, args, kwargs, request)
Copy the code

The method passed in when update_Stragegy is,

task.__trace__ = build_tracer(name, task, loader, self.hostname,
                                          app=self.app) 
Copy the code

Part of the parsing of the build_tracer function is,

def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 Info=TraceInfo, eager=False, propagate=False, app=None,
                 monotonic=monotonic, truncate=truncate,
                 trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES) :
  
    fun = task if task_has_custom(task, '__call__') else task.run   Get the run function for task.def trace_task(uuid, args, kwargs, request=None) :
        # R - is the possibly prepared return value.
        # I - is the Info object.
        # T - runtime
        # Rstr - textual representation of return value
        # retval - is the always unmodified return value.
        # state - is the resulting task state.

        # This function is very long because we've unrolled all the calls
        # for performance reasons, and because the function is so long
        # we want the main variables (I, and R) to stand out visually from the
        # the rest of the variables, so breaking PEP8 is worth it ;)
        
        R = I = T = Rstr = retval = state = None
        task_request = None
        time_start = monotonic()
        ...
        # -*- TRACE -*-
            try:
                R = retval = fun(*args, **kwargs) # Execute the corresponding function
                state = SUCCESS
            except Reject as exc:
                    ...
    return trace_task
Copy the code

The fun function (mytest.add) is the function that the task was supposed to execute (mytest.add), and the corresponding task is executed and the result of the function execution is returned.

At this point, a consumption process is complete.

Starting with Celery, we introduce some of the assist functions such as load balancing, fault tolerance etc.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Celery source analysis – Tasks initialize and send tasks

Celery = tasks with tasks