First we have a requirement that we can dynamically add tasks with Celery 4.x in the Web port. Let’s investigate how to dynamically add tasks with Django.
Let’s create a new Django project
Install the latest Django
pip install django
Copy the code
Django-admin startProject:
django-admin startproject django_con .
Copy the code
Install the celery:
pip3 install django-celery pip3 install -U Celery pip3 install "celery[librabbitmq,redis,auth,msgpack]" pip3 install Pip3 install django-celery-results pip3 install django-celery- tasksCopy the code
Next, install the dependencies:
It = = 2.5.2 anyjson = = 0.3.3 asgiref = = 3.2.7 billiard = = 3.6.3.0 celery = = 4.4.2 cffi = = 1.14.0 cryptography = = 2.9.2 Django = = 3.0.6 Django - celery = = 3.3.1 django - celery - beat = = 2.0.0 django - celery - results = = 1.2.1 django - timezone - field = = 4.0 Dnspython ==1.16.0 eventlet==0.25.2 greenlet==0.4.15 importlib-metadata==1.6.0 kombu==4.6.8 monotonic==1.5 Pycparser ==2.20 python-crontab==2.4.2 python-dateutil==2.8.1 pytz==2020.1 redis==3.5.1 six==1.14.0 SQLparse ==0.3.1 Vine = = 1.3.0 zipp = = 3.1.0Copy the code
Test the celery
Step 1: Create a celery_tasks folder in your Django project and then create tasks.py under the tasks.py figure:
Step 2: Write tasks.py as shown below:
Celery import tasks = celery ('celery_tasks. Tasks ', Broker = 'redis: / / 192.168.196.135:6379/8') # create task function @ app. Task def my_task () : print (" task function is executing..." )Copy the code
Celery the first parameter sets up a name and the second parameter sets up a broker, where I use redis seat brokers. The my_task function is a task function I wrote to register with the lane Broker queue by adding the decorator app.task.
Tasks worker -l info = tasks worker -l info = tasks worker -l info
Call the task
Next, we test the function. Create a task, add it to the task queue, and provide worker to execute it.
Enter the Python terminal and execute the following code:
[root@python_env django_cron]# python3 manage.py shell from celery_tasks. Tasks import my_task An AsyncResult object is returned, which can be used to check the status of the task or get the return value of the task. <AsyncResult: 647b2589-95d2-45c9-a9a7-0b5530CAF249 >Copy the code
Then return to the Worker terminal interface to check the task execution, as shown below:
You can now see that the task has been received and the message has been printed.
Store the result
If we want to keep track of task status,Celery will need to save the results in one place. There are several ways to save: SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP)
Add the results that we are still using the Redis seat store, the task result store configuration we set through the backend parameter of Celery. We will modify the Tasks module:
Backend app = celery ('celery_tasks. Tasks ', Broker = 'redis: / / 192.168.196.135:6379/8 ", backend =' redis: / / 192.168.196.135:6379/9 ') # create task function @ app. Task def my_task (a, B): print(" task function is executing...." ) return a + bCopy the code
I added backend to Celery, specifying redis as the result store and changed the task function to two parameters with a return value.
Let’s borrow this and call this task.
From celery_tasks.tasks import my_task # transfer parameters to task ret = my_task.delay(10,20) # query result ret.result Ret.failed () Displays FalseCopy the code
Then look at the execution of worker, as shown below:
You can see that the celery task has been carried out successfully.
This is just a start, the next step is to see how to add timed tasks.
Optimize Celery directory
Above directly will Celery application create, configuration, tasks tasks all written in a file, so in the back project bigger and bigger, also not convenient. Let’s break it down and add some common parameters below:
Create Celery application file Celery. Py
Celery_tasks import celeryconfig import OS os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Celery_tasks ") # celery app = celery ('celery_tasks') # config_from_object(celeryconfig) Autodiscover_tasks (['celery_tasks',])Copy the code
Configure Celery parameter file celeryconfig.py
# set the results stored CELERY_RESULT_BACKEND = 'redis: / / 192.168.196.135:6379/9' # set agent broker BROKER_URL = 'redis: / / 192.168.196.135:6379/8' # celery start work number set CELERY_WORKER_CONCURRENCY = 20 # task prefetching function, is each work process/thread at the time of retrieval task, I will try to get as many as N to ensure that the communication cost can be compressed. CELERYD_PREFETCH_MULTIPLIER = 20 # this is important in some cases to prevent deadlock CELERYD_FORCE_EXECV = True # celery worker from executing many tasks after restart operation CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # Disable all speed limits and full power is not recommended if network resources are limited. CELERY_DISABLE_RATE_LIMITS = TrueCopy the code
Tasks The task file tasks.py
Task def my_task(a, b, c): print(" tasks not running....") ) return a + b + cCopy the code
Next we start installing using scheduled tasks
Install django – celery – beat
pip3 install django-celery-beat
Copy the code
Configure the django – celery – beat
Django_celery_beat INSTALLED_APPS = ['django_celery_beat', # install django_celery_beat...] LANGUAGE_CODE = 'zh-hans' # use Chinese langu_zone = 'Asia/Shanghai' # Set Django to use Chinese time # if USE_TZ is set to True, If USE_TZ is set to False and TIME_ZONE = 'Asia/Shanghai', the Shanghai UTC time is used. USE_TZ = FalseCopy the code
Configure django_celery_beat in celerconfig.py:
Setdefault ("DJANGO_SETTINGS_MODULE") from django.conf import Settings import OS # os.environ. Setdefault ("DJANGO_SETTINGS_MODULE") CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.time_zone DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'Copy the code
Create table django-celery-beat
Python3 manage.py migrate
Configure Celery using django-celery-beat
configurationcelery.py
:
from celery import Celery from celery_tasks import celeryconfig from django.utils import timezone import os # Set the environment variable os.environ. Setdefault ("DJANGO_SETTINGS_MODULE", "Celery_tasks ") # celery app = celery ('celery_tasks') # config_from_object(celeryconfig) Autodiscover_tasks (['celery_tasks',]) app.now = timezone.nowCopy the code
configurationceleryconfig.py:
Setdefault ("DJANGO_SETTINGS_MODULE") from django.conf import Settings import OS # os.environ. Setdefault ("DJANGO_SETTINGS_MODULE") "Django_con. Settings") # set results stored CELERY_RESULT_BACKEND = 'redis: / / 192.168.196.135:6379/9' # set agent broker BROKER_URL = 'redis: / / 192.168.196.135:6379/8' # celery start work number set CELERY_WORKER_CONCURRENCY = 20 # task prefetching function, is each work process/thread at the time of retrieval task, I will try to get as many as N to ensure that the communication cost can be compressed. CELERYD_PREFETCH_MULTIPLIER = 20 # this is important in some cases to prevent deadlock CELERYD_FORCE_EXECV = True # celery worker from executing many tasks after restart operation CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # Disable all speed limits and full power is not recommended if network resources are limited. CELERY_DISABLE_RATE_LIMITS = True # celery beat CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.time_zone DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER ='django_celery_beat.schedulers:DatabaseScheduler'Copy the code
Writing tasktasks.py
:
Task def my_task1(a, b, c): print(" task1 not running....") ) return a + b + [email protected] def my_task2(): print(" task2 is executing....") )Copy the code
Start scheduled task work:
Before starting scheduled tasks, you must have a Work to execute asynchronous tasks and then start a timer to trigger tasks
Start task work
celery -A celery_tasks worker -l info
Copy the code
Start timer triggerbeat
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Copy the code
Creating a scheduled task
To create a scheduled task, visit the django_celery_beat website: pypi.org/project/dja…
Let’s take a look at the examples on the official website and then put them into practice.
Official website Example description
Create periodic tasks based on interval time
Initialize the interval object interval object
To create a periodic task based on an interval, we need to create an interval object that provides the interval for task setting:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
# executes every 10 seconds.
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
Copy the code
Period = intervalschedule. SECONDS if you want to set other time units, you can set other parameters as follows:
1. Intervalschedule. DAYS Specifies the number of DAYS at a fixed interval
<font face="-apple-system, system-ui, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif" color="#333333"><span style="font-size: 16px; background-color: rgb(255, 255, 255);" ></ span></font>2, intervalschedule. HOURS Number of HOURS at a fixed interval
<font face="-apple-system, system-ui, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif" color="#333333"><span style="font-size: 16px; background-color: rgb(255, 255, 255);" </span></font>3, intervalschedule. MINUTES
<font face="-apple-system, system-ui, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif" color="#333333"><span style="font-size: 16px; background-color: rgb(255, 255, 255);" ></ span></font>4, intervalschedule. SECONDS Specifies the fixed interval of SECONDS
< / span > < / font > 5, IntervalSchedule. MICROSECONDS fixed interval MICROSECONDS
Note: If you have multiple periodic tasks spaced 10 seconds apart, they should all be set to the same interval object. In addition, if you do not know the fixed time units, you can check them as follows:
From django_celery_beat.models import PeriodicTask, IntervalSchedule intervalschedule. PERIOD_CHOICES (('days', 'Days'), ('hours', 'Hours'), ('minutes', 'Minutes'), ('seconds', 'Seconds'), ('microseconds', 'Microseconds'))Copy the code
Create periodic interval tasks
The following is a no-argument creation method:
PeriodicTask.objects.create(
interval=schedule, # we created this above.
name='Importing contacts', # simply describes this periodic task.
task='proj.tasks.import_contacts', # name of task.
)
Copy the code
Create method with parameters as follows:
import json
from datetime import datetime, timedelta
PeriodicTask.objects.create(
interval=schedule, # we created this above.
name='Importing contacts', # simply describes this periodic task.
task='proj.tasks.import_contacts', # name of task.
args=json.dumps(['arg1', 'arg2']),
kwargs=json.dumps({
'be_careful': True,
}),
expires=datetime.utcnow() + timedelta(seconds=30)
)
Copy the code
Create periodic tasks based on cronTab
Initialize the scheduling object for crontab
In the preceding example, scheduling objects are created based on fixed periods, so crontab is similar to the scheduling mode of Crontab in Linux. The crontab scheduling object has the following fields: minute, hour, day_of_week, day_of_month, and month_o Parameter Description Value 30 * * * * Specifies the timing notation for crontab.
from django_celery_beat.models import CrontabSchedule, PeriodicTask
schedule, _ = CrontabSchedule.objects.get_or_create(
minute='30',
hour='*',
day_of_week='*',
day_of_month='*',
month_of_year='*',
timezone=pytz.timezone('Canada/Pacific')
)
Copy the code
Timezone =pytz.timezone(‘Asia/Shanghai’)
Example Create scheduled tasks based on crontab scheduling
Create a task in the same way as create a periodic task at a fixed interval, but change interval=schedule to crontab=schedule.
PeriodicTask.objects.create(
crontab=schedule,
name='Importing contacts',
task='proj.tasks.import_contacts',
)
Copy the code
Stop periodic tasks temporarily
periodic_task.enabled = False
periodic_task.save()
Copy the code
Example of starting a run cycle task
The premise of carrying out periodic tasks is that there needs to be workers to carry out tasks, then first of all, tasks should already be carried out, we have already carried out tasks above. Like I said before, celery workers and beat timing services need to be started at the same time.
Celery -A [project-name] worker — celery =info
2. Start the Beat service as a separate process:
$ celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Copy the code
Alternatively you can use the -s (scheduler flag) identifier and check celery beat –help
$ celery -A [project-name] beat -l info -S django
Copy the code
Alternatively, as an alternative, you can run the above two steps (worker and Beat services) with just one command (recommended for development only)
$ celery -A [project-name] worker --beat --scheduler django --loglevel=info
Copy the code
3. Now that these two services are started, you can start adding periodic tasks.
Specific operation drill
After reading the instructions on the official website above, let’s take the two tasks I wrote earlier to create a periodic task.
Create periodic tasks based on interval time
Initializes the interval objectinterval
object
In [1]: From django_celery_beat.models import PeriodicTask, IntervalSchedule # Create an interval object schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, ) # query created IntervalSchedule interval object. Objects. The all () output results: < QuerySet [< IntervalSchedule: every 10 seconds >] >Copy the code
Create periodic interval tasks
Creates a periodic interval task with no parameters
PeriodicTask. Objects. The create (interval = schedule, # created above 10 seconds interval of the interval object name = 'my_task2', <PeriodicTask: my_task1: every 10 seconds> <PeriodicTask: my_task1: every 10 seconds>Copy the code
The beat service log is displayed as follows:
The 2020-09-23 14:03:00, 146: the INFO/MainProcess Scheduler: Sending due Task my_task2 (celery_tasks.tasks.my_task2) [2020-09-23 14:03:09.189: INFO/MainProcess] Sending due task my_task2 (celery_tasks.tasks.my_task2)Copy the code
The worker service log displays as follows:
[2020-09-23 14:02:30,161: INFO/MainProcess] Received task: Celery_tasks. Tasks. My_task2 [43365 c48-9 b-52-44 ea - ba5f - d75cd7df49dd] [the 2020-09-23 14:02:30, 163: WARNING/ Forkpoolworker-1] Task 2 is executing.... [the 2020-09-23 14:02:30, 166: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd] succeeded in 0.003136722996714525s: None [2020-09-23 14:02:70,161: INFO/MainProcess] Received task: Celery_tasks. Tasks. My_task2 e2f c3f beba4 [770-4-4-8 d36-7 ee0b90dd3b9] [the 0164 14:02:7 2020-09-23: WARNING/ Forkpoolworker-1] Task 2 is executing.... [the 2020-09-23 14:02:7 0167: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9] succeeded in 0.0030223939975257963 S: None [2020-09-23 14:03:20,184: INFO/MainProcess] Received task: Celery_tasks. Tasks. My_task2 [4 b87f2f2 c20 - b028-601-30 d0-4 f28cb8193] [the 2020-09-23 14:03:20, 186: WARNING/ Forkpoolworker-1] Task 2 is executing....Copy the code
Create a periodic interval task with parameters:
import json from datetime import datetime, timedelta PeriodicTask.objects.create( interval=schedule, Task ='celery_tasks.tasks.my_task1', Dumps ([10, 20, 30]); dumps(datetime.now() + timedelta(seconds=30); <PeriodicTask: my_task1: every 10 seconds>Copy the code
View the Beat service log:
Check the worker service log:
Are periodic tasks executed serially or in parallel at the worker?
Here is a question. If there is only one worker, one task takes a long time to execute. For example, the two tasks above are set to sleep for 10 seconds.
Set up thetask
Task sleep
Task def my_task1(a, b, c): print(" task1 not running....") ) print(" Task 1 sleeps for 10 seconds..." ) time.sleep(10) return a + b + [email protected] def my_task2(): print(" task2 is executing....") ) print(" Task 2 function sleep for 10 seconds...." ) time.sleep(10)Copy the code
Delete the two periodic tasks and then view them after they are createdbeat
Services, andworker
Service log
Delete the previous two periodic tasks:
Get (name="my_task1").enabled = False Periodictask.objects.get (name="my_task1").save() Get (name="my_task2").enabled = False Periodictask.objects.get (name="my_task2").save() # Delete task Get (name="my_task1").delete() {' django_celery_beat.periodicTask ': Get (name="my_task2").delete() {' django_celery_beat.periodicTask ': 1})Copy the code
Restart the beat and worker services: The task has been modified and needs to be restarted before it can be reloaded.
# start beat process celery - A celery_tasks beat -l info - the scheduler django_celery_beat. Schedulers: DatabaseScheduler # Start A worker process include-a celery_tasks worker-l infoCopy the code
Recreate two periodic tasks:
From django_celery_beat.models import PeriodicTask, IntervalSchedule # Create an interval object schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, ) # to create the parameters of the periodic task PeriodicTask. Objects. The create (interval = schedule, # created above 10 seconds interval of the interval of the object name = 'my_task2', Task ='celery_tasks.tasks.my_task2', # specify tasks that need to be executed periodically.) Import json from datetime import datetime, Timedelta PeriodicTask. Objects. The create (interval = schedule, # set use created above 10 seconds interval of the interval object name = 'my_task1', Task ='celery_tasks.tasks.my_task1', args= js.dumps ([10, 20, 30]), <PeriodicTask: my_task1: every 10 seconds>Copy the code
Check the log of beat’s periodic tasks:
View the execution log of a single worker:
It can be seen that workers cannot execute tasks in parallel, so tasks are executed in a single worker in serial after they are sent from beat. Therefore, if you want to execute workers concurrently, you can enable multi-threading or multiple processes.
Start 2 workers to view execution logs:
So when you need to execute tasks in parallel, you need to set multipleworker
To carry out the mission.
Create periodic tasks based on cronTab
Infinite loop execution BUG
When the crontab periodic task is used, the Beat service does not stop sending tasks. As a result, the beat service cannot be used. At present try a variety of ways, there is still no solution. Initialize the scheduling object for crontab:
import pytz from django_celery_beat.models import CrontabSchedule, PeriodicTask schedule, _ = CrontabSchedule.objects.get_or_create( minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', Timezone = pytz. Timezone (' Asia/Shanghai)) CrontabSchedule. Objects. The all () output results: [< < QuerySet CrontabSchedule: * * * * * (m/h/d/dM/MY) Asia/Shanghai>, <CrontabSchedule: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>Copy the code
Create a scheduled task with no parameters:
PeriodicTask. Objects. The create (crontab = schedule, # created above crontab object * * * * *, Name ='my_task2_crontab', # set task name value task='celery_tasks.tasks.my_task2', # specify tasks to be executed periodically) <PeriodicTask: my_task2_crontab: * * * * * (m/h/d/dM/MY) Asia/Shanghai>Copy the code
The beat service sends the following logs:
You can query and delete periodic tasks
In fact, periodic tasks are also data stored in the database, basically based on ORM operations.
Query periodic tasks
Models import PeriodicTask from django_celery_beat.models import PeriodicTask from django_celery_beat.models import PeriodicTask <ExtendedQuerySet [<PeriodicTask: Importing contacts: every 10 seconds>, <PeriodicTask: my_task: every 10 seconds>, <PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]> # periodictask.objects. All (): Task1 = periodictask.objects.get (id=1) task1 <PeriodicTask: Importing contacts: every 10 seconds> Periodictask.objects. Get (name="my_task") Task2 = periodictask.objects.get (name="my_task") task2 <PeriodicTask: my_task: every 10 seconds>Copy the code
Delete periodic tasks
After obtaining the periodic task, I tried to delete it directly and found that it was an infinite loop:
Task1.delete () outputs: (1, {' django_celery_beat.periodicTask ': }) task2.delete() {' django_celery_beat.periodictask ': 1})Copy the code
If you want to delete a periodic task, you must first pause the task and then delete it as follows:
# Set periodictask.objects.get (name="my_task1").enabled = False Get (name="my_task1").save() # Delete this task from periodictask.objects.get (name="my_task1").delete() (1, {'django_celery_beat.PeriodicTask': 1})Copy the code
After periodic tasks are paused, they are restarted
Of course, there will be a need to restart a task after suspending it, as follows:
Set task enabled to True: PeriodicTask.objects.get(name="my_task1").enabled = True PeriodicTask.objects.get(name="my_task1").save()Copy the code
Xiaobian is a Python development engineer myself. I spent three days to organize a set of Python learning tutorials from the most basic Python scripts to Web development, crawlers, data analysis, data visualization, machine learning, etc. These materials have friends who want toClick on theCan receive