0 x00 the

Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling.

This series will study Celery in depth with you through source code analysis. This is the second in the series, continuing with the Celery structure.

0x01 Review above

Above, we briefly introduced the concept, use and structure of Celery, let’s recall the structure diagram of Celery as follows:

+-----------+ +--------------+ | Producer | | Celery Beat | +-------+---+ +----+---------+ | | | | v v +-------------------------+ | Broker | +------------+------------+ | | | +-------------------------------+ | | | v v v +----+-----+ +----+------+ +-----+----+ | Exchange | | Exchange | | Exchange | +----+-----+ +----+------+ +----+-----+ |  | | v v v +-----+ +-------+ +-------+ |queue| | queue | | queue | +--+--+ +---+---+ +---+---+ | | | | | | v v v +---------+ +--------+ +----------+ | worker | | Worker | | Worker | +-----+---+ +---+----+ +----+-----+ | | | | | | +-----------------------------+ | | v +---+-----+ | backend | +---------+Copy the code

Let’s continue our analysis from several aspects.

0x02 Worker’s Thinking

When a worker is started, the worker establishes a link (TCP long link) with the broker, and then, if there is a data transfer, the corresponding channel is created. This connection can have multiple channels. Then, the worker will fetch the corresponding task from Borker’s queue for consumption, which is also a typical consumer-producer mode.

2.1 Worker’s mode

First of all, let’s think about the working mode of workers, that is, what is the relationship between these concurrent workers? Do you need a master to unify controls? For a better comparison, let’s look at the implementation of nginx.

2.1.1 Nginx mode

Nginx background processes include a master process and multiple worker processes.

  • The master process is mainly used to manage worker processes, including: receiving signals from the outside, sending signals to all worker processes, monitoring the running status of worker processes. When the worker process exits (in abnormal cases), it will automatically restart the new worker process.
  • Worker processes handle basic network events. Multiple worker processes are peer. They compete equally for requests from clients and are independent of each other. A request can only be processed in one worker process. A worker process cannot handle requests from other processes.

Worker processes are equal, and each process has the same opportunity to process requests. A connection request comes in, and each process has the potential to process the connection. How does that work?

  • First of all, each worker process is fork from the master process. In the master process, after establishing the socket (listenFD) that requires listen, multiple worker processes are fork out.
  • Second, listenFD of all worker processes will become readable upon the arrival of a new connection. To ensure that only one process processes the connection, all worker processes rush accept_mutex before registering the listenFD read event, and the process that obtains the mutex registers the ListenFD read event. Call Accept in the read event to accept the connection.
  • Finally, when a worker process accepts the connection, it starts to read the request, parse the request, process the request, generate data and then return it to the client, and finally disconnect the connection. Such a complete request is like this.

We can see that a request is handled entirely by the worker process, and only within one worker process.

2.1.2 Celery mode

2.1.2.1 mode

Unlike Nginx, there is no master process in Celery. All are worker processes. Everyone is waiting on redis for new tasks.

However, inside each worker, the parent process and the child process, there is a master-slave mode, which is often referred to as “master slave”.

  • Master (the parent process) is responsible for the acquisition, distribution of tasks, the management of Slaves (create, add, close, restart, discard, etc.), the maintenance of other auxiliary modules, etc.
  • Slave is responsible for consuming tasks passed from the scheduler.

The specific internal process of worker is as follows:

  • The scheduler first preforks worker processes as a mutiprocessing-pool, and then listens for kernel events (reads, writes, exceptions, and so on) in an event-driven (select/poll/epoll) way.
  • If the master listens, it performs the corresponding callback, draws tasks continuously from the broker, and pipes them to the slave through a series of routing policies (round-robin, weight, etc.) as a means of communication between processes.
  • Slave worker process consumption (ACK) tasks, and then through the pipe to the scheduler for state synchronization, inter-process communication, and so on.
  • The workloop is essentially listening for the read pipe (the main process is writing from the other end of the pipe) and then performing the corresponding callback, calling the PUT method to synchronize the state to the write pipe (the main process can read from the other end of the pipe).

The details are as follows:

                                       +-------------+
                                       |             |
                                       |    Redis    |
                                       |             |
                                       +---+------+--+
                                           ^      ^
                                           |      |
                                           |      |
                   +-----------------------+      +--------------+
                   |                                             |
                   |                                             |
                   |                                             |
+------------------+--------------------+            +-----------+--------+
|  Worker 1                             |            | Worker n           |
|                                       |            |                    |
|                                       |            |                    |
|             Parent process            |            |   Parent process   |
|                  +                    |            |          +         |
|                  |                    |            |          |         |
|                  |                    |            |          |         |
|         +--------+------------+       |            |          |         |
|         |                     |       |            |          |         |
|         |                     |       |            |          |         |
|         v                     v       |            |          v         |
|    subprocess 1. subprocess n | ... | subprocess | | | | | +---------------------------------------+ +--------------------+Copy the code
2.1.2.2 interaction

In Celery, a distributed management approach is adopted with broadcast/unicast communication between each node to achieve synergy.

When dealing with specific control and management work, worker processes communicate with each other, which can be divided into two types:

  • Use the Mingle module at startup to exchange information with each other.
  • In the running state, status is shared through the Gossip protocol. However, this state sharing is not necessarily related to task allocation and worker scheduling, but only used to monitor and respond to console messages. Because if there are several workers, only one worker should respond to a console message, so the Gossip protocol is used to elect a leader, who responds.

There is no communication between workers when dealing with specific business.

When a worker is started, the worker will establish a link with the broker (TCP long link) and then, if there is any data transfer, the corresponding channel will be created. A connection can have multiple channels. Then, the worker will fetch the corresponding task from Borker’s queue for consumption, which is also a typical consumer-producer mode.

In redis’s case, the underlying Kombu actually uses Redis’s BRPOP functionality to read messages from a specific queue.

If multiple workers use BRPOP to obtain broker messages at the same time, then there is a competition mechanism for which worker can read the message. Because of redis single-process processing, only one worker can read the message.

2.2 the worker of

It is mentioned in worker document that worker is mainly composed of four parts: task_pool, consumer, Scheduler and mediator.

These four parts work on two sets of data structures.

  • Ready queue: Tasks that need to run immediately are placed in the ready queue for consumer execution when they arrive at the worker.
  • ETA: tasks that have an ETA or rate_limit parameter. These tasks are put into the Timer queue, and the timer is responsible for putting these tasks into the execution pool if conditions are right.

However, Mediator is not actually found in the code. Perhaps Mediators became the default function rather than a component.

2.2.1 task_pool

Task_pool is used to store workers. When a worker is started and concurrency parameters are provided, some workers are placed inside it.

Celery default concurrency is prefork i.e. multi-process, here just celery multiprocessing.Pool has been lightly modified and given a new name called prefork.

The difference between this pool and a multi-process process pool is that the task_pool only houses running workers.

2.2.2 consumer

A consumer, also known as a consumer, receives messages from the broker. Then converts the message into celery. Worker. Request. An instance of the request. The request is wrapped in Task when appropriate.

Task is the class generated by the function with the app_include.task () decorator, so you can use this request parameter in your custom Task function to get some key information.

2.2.3 the Scheduler

Scheduler can be explained from two aspects: Beat and Timer.

2.2.3.1 Beat

The Beat process reads the contents of the configuration file and periodically sends tasks that are due in the configuration and need to be executed to the task queue.

The backbone is the Scheduler, the Service is the driver, and the final hosting entity is the SchedulerEntry.

Its main internal data structure is a minimum heap, which is used to carry all the scheduled tasks we set, and the minimum heap is characterized by the smallest elements on the top of the heap, sorted according to the time difference. Celery will first calculate the next timestamp – current timestamp for each timed task and then sort according to the time difference, no doubt the smallest difference is the task that needs to be carried out next time.

In the start function of the Service, scheduler.tick() is called to fetch the next task that needs to be executed in the internal minimum heap. SchedulerEntry is converted into a Task and sent to a queue in Redis.

Specific definitions are as follows:

class Scheduler:
    """Scheduler for periodic tasks. """

    Entry = ScheduleEntry

    #: The schedule dict/shelve.
    schedule = None

    #: Maximum time to sleep between re-checking the schedule.
    max_interval = DEFAULT_MAX_INTERVAL

    #: How often to sync the schedule (3 minutes by default)
    sync_every = 3 * 60

    #: How many tasks can be called before a sync is forced.
    sync_every_tasks = None

    _last_sync = None
    _tasks_since_sync = 0
Copy the code

persistence

In Celery, the execution of timed tasks will not be invalid because we restart Celery, instead, after restarting Celery, the new execution cycle will be recalculated according to the execution state before the last shutdown, and the premise of calculation here is that the old execution information can be obtained, In Scheduler, this information is stored in files by default.

The default storage of Celery is implemented via Python’s default shelve library, which is a database similar to dictionary objects and we can synchronize data in disk and memory by calling sync.

2.2.3.2 Timer

The document describes the Timer as follows:

The timer schedules internal functions, like cleanup and internal monitoring, but also it schedules ETA tasks and rate limited tasks. If the scheduled tasks ETA has passed it is moved to the execution pool.

As you can see, its function is to schedule internal functions, such as cleaning and monitoring, as well as ETA tasks and rate Limited tasks.

For cleanup, there are examples such as backend.process_cleanup and loader.on_process_cleanup.

2.3 Initialization Process

During worker initialization, the execution order of each module is defined by a BluePrint class, and the execution is sorted according to the dependencies between each module.

Worker’s start method (cLET) implements a self. Blueprint data structure (DAG) of Worker’s start method (clet) Initialize different components (steps) based on the parameters passed in from the command line and execute the initialization methods for those components. In fact, it is an object-oriented encapsulation of process control.

The specific functions of each Step are as follows:

  • Timer: Indicates the Timer used to execute the scheduled task.
  • Hub: encapsulating object of Eventloop;
  • Pool: Construct various execution pools (thread/process/coroutine);
  • Autoscaler: used for automatic growth or unit of work in a pool;
  • StateDB: Persist the data of worker restart interval (only restart);
  • Autoreloader: Used to automatically load modified code;
  • Beat: Creates the Beat process, but runs it as a child process (as opposed to the Beat argument on the command line);

0x03 Consumer’s thinking

Celery uses Consumer to get messages from the broker.

3.1 components

The components of Consumer are as follows:

  • [1] Connection: Manage the Connection between broker and management
  • [3] Events: send monitoring Events
  • [2] Agent:cell actor
  • [2] Mingle: Mingle between different workers
  • [1] Tasks: Start the message Consumer
  • [3] Gossip: Consume events from other workers
  • [1] Heart: send a heartbeat event (consumer’s heartbeat)
  • [3] Control: Remote command management service

As mentioned in Reference article 1: Overview of Worker startup process:

I’ve labeled all the bootsteps here, and the size of the labels indicates that these services are important to our code reading, with 1 being the most important and 3 the least important. For consumers,

1 is the basic functionality that makes up a simple, non-robust message queue framework;

2 general important, can achieve a bit of advanced function;

3 is an added feature, but also a bit of a distributed feature.

3.2 role

Therefore, we can see that the concept of celery Consumer component is much larger than Kombu Consumer and not only gets messages from the broker but also includes message consumption, distribution, monitoring, heartbeat etc.

It can be said that except the message cycle engine is carried out by hub, multi-process is carried out by Pool, Autoscaler, timed task is carried out by timer, beat, other major functions are carried out by Consumer.

0x04 High Performance Thinking

The high performance of celery is mainly ensured by two aspects, one is multi-process and the other is event-driven. In addition, the realization of some specific functions also ensures the implementation of high performance.

4.1 multiple processes

Multiple processes make good use of the computing power of each core. It can improve the concurrency of the program to a certain extent and relieve the IO pressure.

The Celery scheme is called prefork i.e. pre-generation. Pre-generation means that the main process forks out a bunch of child processes before executing specific business logic, and stores them for centralized management, forming a process pool. Usually, these sub-processes are asleep. Only when the main process dispatches a task, one of them wakes up and transmits the corresponding task data to the sub-process through inter-process communication.

As mentioned above, inside each worker, parent process and child process, is the master-slave mode.

  • Master (the parent process) is responsible for the acquisition, distribution of tasks, the management of Slaves (create, add, close, restart, discard, etc.), the maintenance of other auxiliary modules, etc.
  • The scheduler preforks slaves as a mutiprocessing-pool and listens for kernel events (reads, writes, exceptions, and so on) via event drives (select/poll/epoll).
  • Slave is responsible for consuming tasks passed from the scheduler. Then through the pipe to the scheduler for state synchronization (sync), inter-process communication and so on.

4.2 Event-driven

Kombu uses event driven internally.

The Master scheduler is an event-driven model, and what is event-driven is that it eliminates blocking.

In the normal single-threaded model, only one message can be taken at a time, and each time a link needs to go back and forth, and a while True loop is needed to continuously detect, which is undoubtedly very inefficient and expensive.

Event-driven, on the other hand, can send multiple detections at the same time, and then suspend, waiting for the kernel to prompt, and then execute the corresponding callback. This elegantly eliminates while True, which is checked every time by a single thread, and reduces repeated links through multiple concurrent requests.

Take epoll for example:

  • Epoll can support both horizontal and Edge Triggered (Edge Triggered, which tells the process what file descriptors have just become ready, only once, and will not be told again if we do not take action). Edge Triggered is theoretically better.

  • Epoll also tells you only about ready file descriptors, and when we call epoll_wait() to get ready file descriptors, instead of the actual descriptor, we return a value that represents the number of ready file descriptors. You just need to fetch the corresponding number of file descriptors from an array specified by epoll. Memory mapping (MMAP) techniques are also used, which completely eliminates the overhead of copying these file descriptors during system calls.

  • Another essential improvement is ePoll’s use of event-based ready notification. In select/poll, the kernel scans all monitored file descriptors only after a certain method is called. Epoll registers a file descriptor through epoll_ctl(). Once a file descriptor is ready, the kernel uses a callback mechanism similar to callback. Activate this file descriptor quickly to be notified when the process calls epoll_wait().

4.3 Implementation of Task

The Task carries the function of starting the corresponding message consumer in the Celery application.

With regard to Task implementation, this raises the question of whether to distribute code or data.

Because Celery is a common feature and not specific to big data, distribution of data is inevitable.

The remaining question is do I need to distribute the code?

The most basic form of a Task is a function. The most direct idea of Task publishing is to package the relevant function code that the client will execute and publish it to the broker. Spark, a distributed computing framework, uses this approach.

4.3.1 Distributing code

The industry’s poster child for code distribution is Spark. The idea behind Spark is simple: move computations rather than data. So how do you describe this calculation? Spark encapsulates a data mapping record using RDD, and records calculations on top of this encapsulation. This leads to the two most important questions:

  • How to split computational logic;
  • How to distribute computing logic;

Spark then divides all computing logic into these two types:

  • Capable of being distributed to nodes for parallel execution;
  • Need to go through a certain amount of results after the combination before continuing to execute;

Then a huge problem is divided into relatively independent sub-problems distributed to each machine to solve.

At actual commit time, Spark submits the calculation code to each work node and performs the calculation.

4.3.2 Celery mode

Celery before 2.0 also supports this mode of task Posting. One obvious downside of this approach is that the amount of data passed to the broker may be large. It is also easy to think of a solution, which is to inform the worker of the task-related code to be published in advance.

This is where global collections and annotation registries come in.

@app.task(name='hello_task')
def hello() :
  print('hello')
Copy the code

App is the application in worker, which registers the task function by means of decorator.

The app maintains a dictionary where key is the name of the task, which is hello_task in this case, and value is the memory address of the function. Task name must be unique but the task name parameter is not required, if not given, tasks will automatically generate a task name based on the package path and function name.

In this way, the client only needs to provide the task name and related parameters, and does not need to provide task-related code:

# client side
app.send_task('hello_task')
Copy the code

Note here: After the client posts a task, the task is written to the broker queue in the form of a message, with the task name and other related parameters, waiting for the worker to obtain it. Here, the task release is completely independent of the worker. Even if the worker does not start, the message will be written into the queue.

This approach also has obvious disadvantages. All task codes to be executed need to be registered on the worker side in advance, and the coupling between the client side and the worker side becomes stronger.

4.4 Prefetch

Currently Kombu QoS only supports preFETch_count.

Prefetch_count is set to:

  • Prefetch refers to a Celery Worker node which can acquire some tasks that are not executed by other nodes in advance so as to improve the running efficiency of Worker node.
  • At the same time, the prefetch count of Qos can be set to control the traffic of consumers, so as to prevent consumers from pulling down all messages from the queue, leading to breakdown of the service, resulting in service crash or exceptions.

Kombu qos preFETch_count is an integer value N, which indicates that a consumer can pull a maximum of N messages at a time. Once N messages are not processed, no new messages are fetched from the queue until a message is ack.

Kombu records the value of prefetch_count and the number of dirty (acked/rejected) messages for the channel.

4.5 Celery function

Celery also provides a number of workflow features, some of which allow us to improve performance. Such as Chunks.

The task block function allows you to divide a large number of objects you need to work with into several task blocks. If you have a million objects, you can create 10 task blocks, each of which processes 100,000 objects. Some might worry that chunking leads to a decrease in parallelism performance, which in fact greatly improves performance by avoiding the overhead of messaging.

add_chunks_sig = add.chunks(zip(range(100), range(100)), 10)
result = add_chunks_sig.delay()
result.get()
Copy the code

0x05 Distributed Thinking

Let’s see how Celery implements distributed tasks from load balancing, disaster recovery and interaction between worke.

5.1 Load Balancing

The load balancing of Celery can actually be divided into three levels and is highly coupled with Kombu (Redis as an example).

  • When the worker decides which queues to interact with, there is a load balancing;
  • There is a load balancing when the worker decides to interact with the broker and retrieve messages using BRPOP;
  • After the worker obtains the broker message, there is a load balancing when the task is specifically invoked internally and multi-process allocation is carried out within the worker.

Also, Celery has an AutoScaler component which actually regulates the pool size online. This is also related to load relief.

The main logic looks like this (more on that in a future article) :

                                                                  +
                                                  Kombu           |   |Redis
                                                                  |
                                               BRPOP(keys)        |
+------------------------------------------+                      |
|  Worker 1                                | ------------------+  |
|                                          |                   |  |
+------------------------------------------+                   |  |        queue 1 key
                                                               |  |
                                                               |  |
+------------------------------------------+     BRPOP(keys)   |  |
| Worker 2                                 | +---------------------------> queue 2 key
|                                          |                   |  |
+------------------------------------------+                   |  |
                                                               |  |
+------------------------------------------+                   |  |        queue 3 key
| Worker 3                                 |                   |  |
|                                          |                   |  |
|     +-----------+                        |                   |  |
|     | queue 1   |                        |     BRPOP(keys)   |  |
|     | queue 2   |                keys    |                   |  |
|     | ......    | +--------+---------------------------------+  |
|     | queue n   |          ^             |                      |
|     +-----------+          |             |                      |
|                            |             |                      |
|                            +             |                      |
|                                          |                      |
|             +      round_robin_cycle     |                      |
|             |                            |                      |
+------------------------------------------+                      |
              |                                                   |
              |  fair_strategy                                    |
              |                                                   |
      +-------+----------+----------------+                       |
      |                  |                |                       |
      v                  v                v                       |
+-----+--------+  +------+-------+  +-----+--------+              |
| subprocess 1 |  | subprocess 2 |  | subprocess 3 |              +
+--------------+  +--------------+  +--------------+

Copy the code

5.2 Failover Dr

5.2.1 Error types & Failure dimensions

There are three main types of errors:

  • User code error: errors can be returned directly to app because Celery cannot know how to handle;
  • Can try next node according to load balancing strategy
  • Network timeout error: Celery can retry the request;

From a system perspective, the most likely dimensions of failure are as follows:

  • The Broker failure;
  • Worker –> Broker this link will fail;
  • The Worker node will fail;
  • Among the multiple processes in Worker, one process itself fails;
  • Internal processing task fails in a process of Worker;

In practice, brokers can use RabbitMQ for clustering and failover. But this is a dimension that involves overall system design, so this series will not analyze it.

5.2.2 Treatment methods

There are two options for error handling, retry and fallback, depending on the error level.

Let’s take the Worker –> Broker dimension as an example. The main concerns in this dimension are:

  1. Broker a node fails.
  2. Network failure between worker and Broker;

In this dimension, both Celery and Kombu have made efforts, but fundamentally it is Kombu’s efforts.

5.2.2.1 retry

In Celery, for retries, there is a broker_connection_max_retries configuration, which is the maximum number of retries.

Celery will retry according to broker_connection_max_retries configuration when a network failure occurs.

In Komub, there are also various retry processes, such as the retry parameters in connection.py:

  • Max_retries: indicates the maximum number of retries.
  • Errback (Callable) : indicates a failed callback policy.
  • Callback (Callable) : callback function for each retry interval;
5.2.2.2 Automatic Retry

Automatic retry is another approach to kombu, such as autoretry in kombu\ Connection. py. The basic formula is:

  • When fun is called, the AutoRetry mapper can be used as a wrapper. And you can pass in the channel from which the last call was successful.
  • If a fun call fails, kombu automatically tries it.
5.2.2.3 fallback

If retry does not resolve the problem, fallback is used. For example broker_failover_strategy is the strategy set by Celery against broker Connection. Will be automatically mapped to the kombu. Connection. Failover_strategies.

When Kombu configures a Connection, it can set multiple Broker urls. When connecting to brokers, Kombu automatically selects the healthiest broker node for Connection.

5.3 Interaction between workers

As mentioned above, in the running state, worker processes share the status through the Gossip protocol when dealing with specific control management work.

However, this state sharing is not necessarily related to task allocation and worker scheduling, but only used to monitor and respond to console messages. Because if there are several workers, only one worker should respond to a console message, so the Gossip protocol is used to elect a leader, who responds.

The Gossip protocol, like any other protocol, has some unavoidable drawbacks, mainly two:

1) Message delay

Because nodes in the Gossip protocol send messages to only a few random nodes, messages are eventually propagated in multiple rounds to reach the entire network, so the use of the Gossip protocol causes inevitable message delays. It is not suitable for the scene requiring high real-time performance.

2) Message redundancy

According to the Gossip protocol, nodes periodically select neighboring nodes to send messages, and the receiving nodes also repeat this step. Therefore, messages are inevitably sent repeatedly to the same node, causing redundancy of messages and increasing the processing pressure of the receiving nodes. Moreover, because the message is sent periodically, even the nodes that receive the message will receive repeated messages, which aggravates the redundancy of the message.

Why gossip? Possibly because gossip is used to handle the administrative function, which selects a leader among workers to respond to messages from the console. This eliminates the need for immediacy of messages.

0 x06 summary

From the above analysis you should have a preliminary understanding of the structure of Celery. Starting with the following, we will analyze some aspects of Celery one by one, please look forward to them.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Basic concepts of Master and Worker in Nginx data

1: Overview of Worker startup process

2: Worker’s execution engine

3: Implementation of Task objects

4: Implementation of scheduled tasks

5: Remote control management

6: Implementation of Events

7: Interaction between workers

8: the State and the Result

Application of Spark distributed computing engine