celery
Celery is a simple, customizable and reliable distributed system for processing large amounts of information, supporting asynchronous tasks, timed tasks. Celery is a distributed queue manager that allows you to quickly manage a distributed queue of tasks using the Celery interface. It is not a task itself, but an administrative tool that encapsulates common operations for task management.
First you need to install celery, the latest version is 4.3, if using Redis as broker it is recommended to use 4.1 as 4.3 May have compatibility issues with Django-Redis-cache, the modules required to resolve this are as follows
PIP install django-include-tasks == celery-celery-tasks == celery-celery-tasks == celery-celery-tasks == celery-celery-tasks == celery-celery-tasks == celery-celery-tasks Redis = = 3.2.0Copy the code
Components of celery
Celery uses producer consumer design mode
- Celery Beat: Tasks. The Beat process reads the configuration file and periodically sends the tasks that are due to be executed to the task queue
- Celery Worker: Consumers who perform tasks will normally run multiple consumers in the background server to improve running efficiency
- Broker: Message queue. The queue itself, also known as message middleware, receives task messages from task producers, stores them in queues and distributes them to task consumers (usually message queues or databases).
- Producer: Calls Celery API, functions or decorators to create tasks that are assigned to the task queue
- Result Backend: Saves status information and results after task processing for query
Celery frame diagram:
Graph TD A[Celery Beat] C[message Broker]-->B B-->D[tasks consumer Worker] B-->E[tasks consumer Worker] B-->F[tasks consumer Worker Worker] D--> E-->G F-->GCopy the code
2. The way tasks are generated
- Publishers publish tasks (WEB applications)
- Task Scheduling Publish tasks (scheduled tasks)
Celery depends on three repositories
- Billiard: an improved library based on Python2.7’s multisuprocessing to improve performance and stability
- Librabbitmp: Python client implemented in C
- Kombu: Celery is a library for sending and receiving messages, providing a Python custom advanced interface using the AMQP protocol
Message Broker
Use RabbitMQ and Redis
Use RabbitMQ as the proxy:
sudo apt-get install rabbitmq-server
Copy the code
Or run it inside Docker:
docker run -d -p 5462:5462 rabbitmq
Copy the code
Using Redis as a proxy:
#Run it inside Docker
docker run -d -p 6379:6379 redis
Copy the code
The specific configuration of using Redis as the proxy. Start with module celery[redis]
pip install -U "celery[redis]"
Copy the code
Then set the address of the message queue store:
app.conf.broker_url = 'redis://localhost:6379/0'
The format is as follows
# redis://:password@hostname:port/db_number
Copy the code
Basic usage
We can simply create a tasks.py to write the message queue
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Copy the code
Celery message queue this is a basic celery message queue, celery basic use process is as follows:
- Create Celery app
- Specify the message Broker
- Create a task
- Add a task to the task queue
- Configuring an Execution Policy
With the Django
With Celery in Django, you can write configuration sets into settings.py.
Note: celery4.0 supports Django1.8 and above, use celery3.1 for the following versions.
app
Start with Celery app and create /proj/ Celery. Py in Django project directory
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE'.'proj.settings')
# it`s not necessary to have many instances of the library, one is enough
app = Celery('proj')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0! r}'.format(self.request))
Copy the code
registered
Then register the app in the /proj/proj/__init__. Py file to ensure that Django starts with celery app loaded:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app'.)Copy the code
create
In each application’s directory, create the file /app/tasks.py and use the @shared_task decorator to create tasks
from celery import shared_task
@shared_task
def xsum(numbers):
return sum(numbers)
Copy the code
The message queue
results
Tasks run information will be stored in the default database sqLite, change it to the default database using a module named Django-include-results
PIP install django-celery-results in settings.py
INSTALLED_APPS = ( ... .'django_celery_results'
)
Copy the code
Migrate the app so that it generates database mappings
python manage.py migrate django_celery_results
Copy the code
Then set the restult dependency in settings.py to the database you set in Django
CELERY_RESULT_BACKEND = 'django-db'
If it is cache-based
CELERY_CACHE_BACKEND = 'django-cache'
Copy the code
Broker
Configure the receiver for the Broker message queue in settings.py, using Redis.
CELERY_BROKER_URL = 'redis://localhost:6379/3'
#: Only add pickle to this list if your broker is secured
#: from unwanted access (see userguide/security.html)
CELERY_ACCEPT_CONTENT = ['json']
Save to database
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TASK_SERIALIZER = 'json'
Copy the code
Timing task
You first define tasks in tasks.py, and then configure execution policies in settings.py
schedule
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add'.'schedule': 30.0.'args': (16.16)
},
}
app.conf.timezone = 'UTC'
Copy the code
There are several parameters:
- Task: Task name (function name) to be executed
- Schedule: indicates the running frequency
- Args: Arguments passed in when a function is run, which can be lists or tuples
- Kwargs: parameter of the key-value pair type
crontab
You can also use crontab to execute tasks periodically, such as daily, weekly, and monthly
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add'.'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16.16),}}Copy the code
Such as:
Example | Meaning |
---|---|
crontab() | This command is executed every minute |
crontab(minute=0, hour=0) | Executed every morning |
crontab(minute=0, hour=’*/3′) | Every three hours |
crontab(day_of_week=’sunday’) | It is executed every Sunday |
crontab(0, 0,day_of_month=’2′) | Perform this operation once on the second day of each month |
crontab(0, 0, month_of_year=’*/3′) | It is executed on the first day of each quarter |