I have come into contact with Celery in my recent work, which is an open source Distributed Task Queue with 18K star available on Github and can mainly be used to implement asynchronous tasks and scheduled tasks in applications. Although it is written in Python, However, the protocol can be implemented in any language and gocelery, nodecelery and celery-php are available.
In this article, I summarize my understanding of Celery and its use in work. The general content of this paper is as follows:
- What the task queue is;
- Celery has done anything;
- Celery in working practice.
What is a task queue
Message Queue: RabbitMQ, RocketMQ, Kafka. While “Task Queue”, I have not heard of Task Queue before TOUCHING Celery. What is the task queue, and what is the relationship between the task queue and the message queue. With questions, let’s look at the structure of Celery:
In the Celery structure, it can be seen that there are multiple servers initiating Async tasks and sending tasks to the queue of the Broker where the Celery Beat process can initiate timed tasks. When the Task reaches the Broker, it is allocated to the corresponding Celery Worker for processing. After a Task is processed, the result is stored in Backend.
Broker and Backend, Celery are not implemented in the above process, instead existing open source implementations are used, such as RabbitMQ as Broker to provide message queuing services and Redis as Backend to provide result storage services. Celery like abstract implementation of Producer and Consumer in message queue architecture, abstract basic unit “messages” in message queue into “tasks” in task queue, and encapsulate asynchronous, timed task initiation and result storage operations. Allows developers to ignore implementation details such as AMQP and RabbitMQ to facilitate development.
In summary, Celery as task queue is based on further encapsulation of message queue and its implementation depends on message queue.
Next, find out specifically what Celery does through a simple application.
What did Celery do
In application development, operations that are time-consuming and do not affect the process are usually handled asynchronously to ensure fast response. For example during user registration there is usually an asynchronous email notification sent to the user, let’s see how Celery implements this asynchronous operation.
Declare the mail method send_mail in task.py and add the @app.task decorator provided by Celery to it. With this decorator you can turn the send_mail function into an include.app. Task :Task instance object. The Task instance provides two core functions:
- Sending a message to the queue;
- Declare the specific function that the Worker needs to execute after receiving the message.
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def send_mail(email) :
print("send mail to ", email)
import time
time.sleep(5)
return "success"
Copy the code
A Task has been defined. To initiate an asynchronous Task, you can invoke the delay method of the Task, which sends messages to the queue. For example, to initiate an asynchronous email Task when user registration is complete:
# user.py
from tasks import send_mail
def register() :
print("1. Insert record into database")
print("2. Email with celery asynchronously")
send_mail.delay("[email protected]")
print("3. Tell user successful registration")
if __name__ =='__main__':
register()
Copy the code
After running the program, the message has been sent to the RabbitMQ queue in the following format:
It can be seen that the message after Celery encapsulation contains task identity and running parameters, etc.
Next, start the Worker consuming the messages in RabbitMQ:
celery -A tasks worker --loglevel=info
Copy the code
When the Worker starts, you can see the following printed message:
The first is the Worker’s configuration information, followed by a list of tasks that the Worker executes, followed by a successful message from RabbitMQ and the execution of the corresponding Task.
With the above example, we can further understand the work done by Celery as task queue frame, while “distributed” in “distributed task queue” means that Producer and Consumer can have multiple tasks, i.e., multiple processes send tasks to the Broker. Multiple workers fetch tasks from the Broker and execute them.
The above is just a simple example, then let’s look at some practical experience about the use of Celery in my work.
Celery in working practice
Queues are divided according to service scenarios
In the project I worked on, Celery is used to handle asynchronous tasks and timed tasks such as placing orders, parsing tracks and pushing upstream. You can specify a queue for each Task based on the service scenario. For example:
DEFAULT_CELERY_ROUTES = {
'celery_task.pending_create': {'queue': 'create'},
'celery_task.multi_create': {'queue': 'create'},
'celery_task.pull_tracking': {'queue': 'pull'},
'celery_task.pull_branch': {'queue': 'pull'},
'celery_task.push_tracking': {'queue': 'push'},
'celery_task.push_weight': {'queue': 'push'},
}
CELERY_ROUTES = {
DEFAULT_CELERY_ROUTES
}
Copy the code
Based on service scenarios, specify queues corresponding to six tasks in DEFAULT_CELERY_ROUTES configuration, including create, pull, and push queues, and add the routing rule to CELERY_ROUTES to take effect. This is designed so that different scenarios do not affect each other; for example, parsing task blocking should not affect ordering tasks.
Further queuing
After rough division based on service scenarios, more detailed division may be required for a certain scenario. For example, to prevent the blocking of one upstream node from affecting the push to other upstream nodes, ensure that the upstream nodes do not affect each other. Therefore, different queues need to be used for different upstream streams, for example:
CLIENT_CELERY_ROUTES = {
# {0} is a placeholder for the client, formatted in ClientRouter
'celery_task.push_tracking_retry': {'queue': 'push_tracking_retry_{0}'},
'celery_task.push_weight_retry': {'queue': 'push_weight_retry_{0}'}},class ClientRouter(object) :
def route_for_task(self, task, args=None, kwargs=None) :
if task not in CLIENT_CELERY_ROUTES:
return None
client_id = kwargs('client_id')
Get queue name based on client_id
queue_name = CLIENT_CELERY_ROUTES[task]['queue'].format(client_id)
return {'queue': queue_name}
CELERY_ROUTES = {
'ClientRouter'
DEFAULT_CELERY_ROUTES,
}
Copy the code
CLIENT_CELERY_ROUTES specifies the Queue name format for tasks that need to be queued based on clients, with a placeholder in the Queue name to get different Queue names based on different clients.
Then a router ClientRouter is implemented, which defines the router_for_task method to specify the queue name for the task. If the task is in CLIENT_CELERY_ROUTES, the queue name will be formatted with the client_id in kwargs to get the queue name that will send the message. This isolates different clients from using different queues.
In addition to dividing queues on the Client dimension, you can also design routing rules by referring to this method if you need to further divide queues on other dimensions to achieve isolation.
Dynamic queue
Again, the dynamic queue is a preparatory queue in nature. Its purpose is to reduce the pressure of message accumulation in some queues in the online environment and play a role of fast support. For example, if the push queue is under heavy pressure, you can configure JSON as follows and route push_tracking and PUSH_weight tasks to the prepared dynamic queue.
{celery_dynamic_router configuration"celery_task.push_tracking": {
"dynamic_queue": [1.2]."dynamic_percentage": 0.7,},"celery_task.push_weight": {
"dynamic_queue": [3.4]."dynamic_percentage": 0.7,}}Copy the code
The above configuration routes 70% of celery_task.push_tracking tasks to dynamic queues 1 and 2, and 70% of celery_task.push_weight tasks to dynamic queues 3 and 4.
The dynamic queue router is implemented as follows:
class DynamicRouter(object) :
def route_for_task(self, task, args=None, kwargs=None) :
Get configuration
task_config = get_conf_dict('celery_dynamic_router').get(task, None)
If task is not configured, return task
if not task_config:
return None
Get task dynamic queue configuration
dynamic_queue = task_config.get('dynamic_queue', [])
dynamic_percentage = task_config.get('dynamic_percentage'.0.0)
Route a percentage of tasks to a dynamic queue
if random.random() <= dynamic_percentage:
# Decide which dynamic queue to use
queue_name = router_load_balance(dynamic_queue, task_name)
log.data('get_router| task_name:%s, queue:%s', task_name, queue_name)
return {'queue': queue_name}
else:
return None
Copy the code
A scheduled task can be configured
Not only tasks can be carried out asynchronously but also tasks can be carried out with Celery Beat. Let’s start with an example:
from celery.schedules import crontab
app.conf.beat_schedule = {
# Send emails every 30 seconds
'sendmail-every-30-seconds': {
'task': 'asks.send_mail'.'schedule': 30.0.'args': ['[email protected]']}}Copy the code
After completing the above configuration, run Celery Beat command:
That is, the send_email task is executed every 30 seconds.
The above example configures a scheduled task in code. In my work, the database scheduling model provided by Djcelery was used, which was set up dynamically in combination with the ORM function provided by Django. Celery configurator: Celery configurator
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
Copy the code
Set up a configuration table using DatabaseScheduler and then regenerate it into a scheduled task:
python manage.py migrate
Copy the code
You can see that the following tables are added to the database:
| celery_taskmeta |
| celery_tasksetmeta |
| djcelery_crontabschedule |
| djcelery_intervalschedule |
| djcelery_periodictask |
| djcelery_periodictasks |
| djcelery_taskstate |
| djcelery_workerstate |
Copy the code
If you complete the above tasks, only run the Celery Beat command at the end then the tasks will be loaded into the database to initiate the tasks. The advantage of this is that you can dynamically configure scheduled tasks by modifying records in the database, for example, adjusting the period or parameters of the task.
Above is what I gained when I came into contact with Celery in my work, if there are scenarios where asynchronous tasks and timed tasks need to be implemented, please consider using Celery.
I am Grass Niazi, a grass carp who loves technology and life. See you next time!
reference
-
Message Queue vs Task Queue difference
-
High performance asynchronous frame Celery into pits guide
-
Elongated tasks queue tasks