First we have a requirement that we can dynamically add tasks with Celery 4.x in the Web port. Let’s investigate how to dynamically add tasks with Django.

Let’s create a new Django project

Install the latest Django

pip install django
Copy the code

Django-admin startProject:

django-admin startproject django_con .
Copy the code

Install the celery:

pip3 install django-celery pip3 install -U Celery pip3 install "celery[librabbitmq,redis,auth,msgpack]" pip3 install Pip3 install django-celery-results pip3 install django-celery- tasksCopy the code

Next, install the dependencies:

It = = 2.5.2 anyjson = = 0.3.3 asgiref = = 3.2.7 billiard = = 3.6.3.0 celery = = 4.4.2 cffi = = 1.14.0 cryptography = = 2.9.2 Django = = 3.0.6 Django - celery = = 3.3.1 django - celery - beat = = 2.0.0 django - celery - results = = 1.2.1 django - timezone - field = = 4.0 Dnspython ==1.16.0 eventlet==0.25.2 greenlet==0.4.15 importlib-metadata==1.6.0 kombu==4.6.8 monotonic==1.5 Pycparser ==2.20 python-crontab==2.4.2 python-dateutil==2.8.1 pytz==2020.1 redis==3.5.1 six==1.14.0 SQLparse ==0.3.1 Vine = = 1.3.0 zipp = = 3.1.0Copy the code

Test the celery

Step 1: Create a celery_tasks folder in your Django project and then create tasks.py under the tasks.py figure:

Step 2: Write tasks.py as shown below:

Celery import tasks = celery ('celery_tasks. Tasks ', Broker = 'redis: / / 192.168.196.135:6379/8') # create task function @ app. Task def my_task () : print (" task function is executing..." )Copy the code

Celery the first parameter sets up a name and the second parameter sets up a broker, where I use redis seat brokers. The my_task function is a task function I wrote to register with the lane Broker queue by adding the decorator app.task.

Tasks worker -l info = tasks worker -l info = tasks worker -l info

 

Call the task

Next, we test the function. Create a task, add it to the task queue, and provide worker to execute it.

Enter the Python terminal and execute the following code:

[root@python_env django_cron]# python3 manage.py shell from celery_tasks. Tasks import my_task An AsyncResult object is returned, which can be used to check the status of the task or get the return value of the task. <AsyncResult: 647b2589-95d2-45c9-a9a7-0b5530CAF249 >Copy the code

Then return to the Worker terminal interface to check the task execution, as shown below:

 

You can now see that the task has been received and the message has been printed.

Store the result

If we want to keep track of task status,Celery will need to save the results in one place. There are several ways to save: SQLAlchemy, Django ORM, Memcached, Redis, RPC (RabbitMQ/AMQP)

Add the results that we are still using the Redis seat store, the task result store configuration we set through the backend parameter of Celery. We will modify the Tasks module:

Backend app = celery ('celery_tasks. Tasks ', Broker = 'redis: / / 192.168.196.135:6379/8 ", backend =' redis: / / 192.168.196.135:6379/9 ') # create task function @ app. Task def my_task (a, B): print(" task function is executing...." ) return a + bCopy the code

I added backend to Celery, specifying redis as the result store and changed the task function to two parameters with a return value.

Let’s borrow this and call this task.

From celery_tasks.tasks import my_task # transfer parameters to task ret = my_task.delay(10,20) # query result ret.result Ret.failed () Displays FalseCopy the code

Then look at the execution of worker, as shown below:

 

You can see that the celery task has been carried out successfully.

This is just a start, the next step is to see how to add timed tasks.

Optimize Celery directory

Above directly will Celery application create, configuration, tasks tasks all written in a file, so in the back project bigger and bigger, also not convenient. Let’s break it down and add some common parameters below:

 

Create Celery application file Celery. Py

Celery_tasks import celeryconfig import OS os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Celery_tasks ") # celery app = celery ('celery_tasks') # config_from_object(celeryconfig) Autodiscover_tasks (['celery_tasks',])Copy the code

Configure Celery parameter file celeryconfig.py

# set the results stored CELERY_RESULT_BACKEND = 'redis: / / 192.168.196.135:6379/9' # set agent broker BROKER_URL = 'redis: / / 192.168.196.135:6379/8' # celery start work number set CELERY_WORKER_CONCURRENCY = 20 # task prefetching function, is each work process/thread at the time of retrieval task, I will try to get as many as N to ensure that the communication cost can be compressed. CELERYD_PREFETCH_MULTIPLIER = 20 # this is important in some cases to prevent deadlock CELERYD_FORCE_EXECV = True # celery worker from executing many tasks after restart operation CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # Disable all speed limits and full power is not recommended if network resources are limited. CELERY_DISABLE_RATE_LIMITS = TrueCopy the code

Tasks The task file tasks.py

Task def my_task(a, b, c): print(" tasks not running....") ) return a + b + cCopy the code

Next we start installing using scheduled tasks

Install django – celery – beat

pip3 install django-celery-beat
Copy the code

Configure the django – celery – beat

Django_celery_beat INSTALLED_APPS = ['django_celery_beat', # install django_celery_beat...] LANGUAGE_CODE = 'zh-hans' # use Chinese langu_zone = 'Asia/Shanghai' # Set Django to use Chinese time # if USE_TZ is set to True, If USE_TZ is set to False and TIME_ZONE = 'Asia/Shanghai', the Shanghai UTC time is used. USE_TZ = FalseCopy the code

Configure django_celery_beat in celerconfig.py:

Setdefault ("DJANGO_SETTINGS_MODULE") from django.conf import Settings import OS # os.environ. Setdefault ("DJANGO_SETTINGS_MODULE") CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.time_zone DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'Copy the code

Create table django-celery-beat

Python3 manage.py migrate

 

Configure Celery using django-celery-beat

configurationcelery.py:

from celery import Celery from celery_tasks import celeryconfig from django.utils import timezone import os # Set the environment variable os.environ. Setdefault ("DJANGO_SETTINGS_MODULE", "Celery_tasks ") # celery app = celery ('celery_tasks') # config_from_object(celeryconfig) Autodiscover_tasks (['celery_tasks',]) app.now = timezone.nowCopy the code

configurationceleryconfig.py:

Setdefault ("DJANGO_SETTINGS_MODULE") from django.conf import Settings import OS # os.environ. Setdefault ("DJANGO_SETTINGS_MODULE") "Django_con. Settings") # set results stored CELERY_RESULT_BACKEND = 'redis: / / 192.168.196.135:6379/9' # set agent broker BROKER_URL = 'redis: / / 192.168.196.135:6379/8' # celery start work number set CELERY_WORKER_CONCURRENCY = 20 # task prefetching function, is each work process/thread at the time of retrieval task, I will try to get as many as N to ensure that the communication cost can be compressed. CELERYD_PREFETCH_MULTIPLIER = 20 # this is important in some cases to prevent deadlock CELERYD_FORCE_EXECV = True # celery worker from executing many tasks after restart operation CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # Disable all speed limits and full power is not recommended if network resources are limited. CELERY_DISABLE_RATE_LIMITS = True # celery beat CELERY_ENABLE_UTC = False CELERY_TIMEZONE = settings.time_zone DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER ='django_celery_beat.schedulers:DatabaseScheduler'Copy the code

Writing tasktasks.py:

Task def my_task1(a, b, c): print(" task1 not running....") ) return a + b + [email protected] def my_task2(): print(" task2 is executing....") )Copy the code

Start scheduled task work:

Before starting scheduled tasks, you must have a Work to execute asynchronous tasks and then start a timer to trigger tasks

Start task work

celery -A celery_tasks worker -l info
Copy the code

Start timer triggerbeat

celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Copy the code

Creating a scheduled task

To create a scheduled task, visit the django_celery_beat website: pypi.org/project/dja…

Let’s take a look at the examples on the official website and then put them into practice.

Official website Example description

Create periodic tasks based on interval time

Initialize the interval object interval object

To create a periodic task based on an interval, we need to create an interval object that provides the interval for task setting:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

# executes every 10 seconds.
 schedule, created = IntervalSchedule.objects.get_or_create(
     every=10,
     period=IntervalSchedule.SECONDS,
 )
Copy the code

Period = intervalschedule. SECONDS if you want to set other time units, you can set other parameters as follows:

1. Intervalschedule. DAYS Specifies the number of DAYS at a fixed interval

<font face="-apple-system, system-ui, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif" color="#333333"><span style="font-size: 16px; background-color: rgb(255, 255, 255);" ></ span></font>2, intervalschedule. HOURS Number of HOURS at a fixed interval

<font face="-apple-system, system-ui, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif" color="#333333"><span style="font-size: 16px; background-color: rgb(255, 255, 255);" </span></font>3, intervalschedule. MINUTES

<font face="-apple-system, system-ui, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Hiragino Sans GB, Microsoft YaHei, Arial, sans-serif" color="#333333"><span style="font-size: 16px; background-color: rgb(255, 255, 255);" ></ span></font>4, intervalschedule. SECONDS Specifies the fixed interval of SECONDS

< / span > < / font > 5, IntervalSchedule. MICROSECONDS fixed interval MICROSECONDS

Note: If you have multiple periodic tasks spaced 10 seconds apart, they should all be set to the same interval object. In addition, if you do not know the fixed time units, you can check them as follows:

From django_celery_beat.models import PeriodicTask, IntervalSchedule intervalschedule. PERIOD_CHOICES  (('days', 'Days'), ('hours', 'Hours'), ('minutes', 'Minutes'), ('seconds', 'Seconds'), ('microseconds', 'Microseconds'))Copy the code

Create periodic interval tasks

The following is a no-argument creation method:

 PeriodicTask.objects.create(
     interval=schedule,                  # we created this above.
     name='Importing contacts',          # simply describes this periodic task.
     task='proj.tasks.import_contacts',  # name of task.
 )
Copy the code

Create method with parameters as follows:

 import json
 from datetime import datetime, timedelta

 PeriodicTask.objects.create(
     interval=schedule,                  # we created this above.
     name='Importing contacts',          # simply describes this periodic task.
     task='proj.tasks.import_contacts',  # name of task.
     args=json.dumps(['arg1', 'arg2']),
     kwargs=json.dumps({
        'be_careful': True,
     }),
     expires=datetime.utcnow() + timedelta(seconds=30)
 )
Copy the code

Create periodic tasks based on cronTab

Initialize the scheduling object for crontab

In the preceding example, scheduling objects are created based on fixed periods, so crontab is similar to the scheduling mode of Crontab in Linux. The crontab scheduling object has the following fields: minute, hour, day_of_week, day_of_month, and month_o Parameter Description Value 30 * * * * Specifies the timing notation for crontab.

 from django_celery_beat.models import CrontabSchedule, PeriodicTask
 schedule, _ = CrontabSchedule.objects.get_or_create(
     minute='30',
     hour='*',
     day_of_week='*',
     day_of_month='*',
     month_of_year='*',
     timezone=pytz.timezone('Canada/Pacific')
 )
Copy the code

Timezone =pytz.timezone(‘Asia/Shanghai’)

Example Create scheduled tasks based on crontab scheduling

Create a task in the same way as create a periodic task at a fixed interval, but change interval=schedule to crontab=schedule.

 PeriodicTask.objects.create(
     crontab=schedule,
     name='Importing contacts',
     task='proj.tasks.import_contacts',
 )
Copy the code

Stop periodic tasks temporarily

 periodic_task.enabled = False 
 periodic_task.save()
Copy the code

Example of starting a run cycle task

The premise of carrying out periodic tasks is that there needs to be workers to carry out tasks, then first of all, tasks should already be carried out, we have already carried out tasks above. Like I said before, celery workers and beat timing services need to be started at the same time.

Celery -A [project-name] worker — celery =info

2. Start the Beat service as a separate process:

 $ celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Copy the code

Alternatively you can use the -s (scheduler flag) identifier and check celery beat –help

$ celery -A [project-name] beat -l info -S django
Copy the code

Alternatively, as an alternative, you can run the above two steps (worker and Beat services) with just one command (recommended for development only)

$ celery -A [project-name] worker --beat --scheduler django --loglevel=info
Copy the code

3. Now that these two services are started, you can start adding periodic tasks.

Specific operation drill

After reading the instructions on the official website above, let’s take the two tasks I wrote earlier to create a periodic task.

Create periodic tasks based on interval time

Initializes the interval objectintervalobject

In [1]: From django_celery_beat.models import PeriodicTask, IntervalSchedule # Create an interval object schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, ) # query created IntervalSchedule interval object. Objects. The all () output results: < QuerySet [< IntervalSchedule: every 10 seconds >] >Copy the code

Create periodic interval tasks

Creates a periodic interval task with no parameters

PeriodicTask. Objects. The create (interval = schedule, # created above 10 seconds interval of the interval object name = 'my_task2', <PeriodicTask: my_task1: every 10 seconds> <PeriodicTask: my_task1: every 10 seconds>Copy the code

The beat service log is displayed as follows:

The 2020-09-23 14:03:00, 146: the INFO/MainProcess Scheduler: Sending due Task my_task2 (celery_tasks.tasks.my_task2) [2020-09-23 14:03:09.189: INFO/MainProcess] Sending due task my_task2 (celery_tasks.tasks.my_task2)Copy the code

The worker service log displays as follows:

[2020-09-23 14:02:30,161: INFO/MainProcess] Received task: Celery_tasks. Tasks. My_task2 [43365 c48-9 b-52-44 ea - ba5f - d75cd7df49dd] [the 2020-09-23 14:02:30, 163: WARNING/ Forkpoolworker-1] Task 2 is executing.... [the 2020-09-23 14:02:30, 166: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd] succeeded in 0.003136722996714525s: None [2020-09-23 14:02:70,161: INFO/MainProcess] Received task: Celery_tasks. Tasks. My_task2 e2f c3f beba4 [770-4-4-8 d36-7 ee0b90dd3b9] [the 0164 14:02:7 2020-09-23: WARNING/ Forkpoolworker-1] Task 2 is executing.... [the 2020-09-23 14:02:7 0167: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9] succeeded in 0.0030223939975257963 S: None [2020-09-23 14:03:20,184: INFO/MainProcess] Received task: Celery_tasks. Tasks. My_task2 [4 b87f2f2 c20 - b028-601-30 d0-4 f28cb8193] [the 2020-09-23 14:03:20, 186: WARNING/ Forkpoolworker-1] Task 2 is executing....Copy the code

Create a periodic interval task with parameters:

import json from datetime import datetime, timedelta PeriodicTask.objects.create( interval=schedule, Task ='celery_tasks.tasks.my_task1', Dumps ([10, 20, 30]); dumps(datetime.now() + timedelta(seconds=30); <PeriodicTask: my_task1: every 10 seconds>Copy the code

View the Beat service log:

 

Check the worker service log:

 

Are periodic tasks executed serially or in parallel at the worker?

Here is a question. If there is only one worker, one task takes a long time to execute. For example, the two tasks above are set to sleep for 10 seconds.

Set up thetaskTask sleep

Task def my_task1(a, b, c): print(" task1 not running....") ) print(" Task 1 sleeps for 10 seconds..." ) time.sleep(10) return a + b + [email protected] def my_task2(): print(" task2 is executing....") ) print(" Task 2 function sleep for 10 seconds...." ) time.sleep(10)Copy the code

Delete the two periodic tasks and then view them after they are createdbeatServices, andworkerService log

Delete the previous two periodic tasks:

Get (name="my_task1").enabled = False Periodictask.objects.get (name="my_task1").save() Get (name="my_task2").enabled = False Periodictask.objects.get (name="my_task2").save() # Delete task Get (name="my_task1").delete() {' django_celery_beat.periodicTask ': Get (name="my_task2").delete() {' django_celery_beat.periodicTask ': 1})Copy the code

Restart the beat and worker services: The task has been modified and needs to be restarted before it can be reloaded.

# start beat process celery - A celery_tasks beat -l info - the scheduler django_celery_beat. Schedulers: DatabaseScheduler # Start A worker process include-a celery_tasks worker-l infoCopy the code

Recreate two periodic tasks:

From django_celery_beat.models import PeriodicTask, IntervalSchedule # Create an interval object schedule, created = IntervalSchedule.objects.get_or_create( every=10, period=IntervalSchedule.SECONDS, ) # to create the parameters of the periodic task PeriodicTask. Objects. The create (interval = schedule, # created above 10 seconds interval of the interval of the object name = 'my_task2', Task ='celery_tasks.tasks.my_task2', # specify tasks that need to be executed periodically.) Import json from datetime import datetime, Timedelta PeriodicTask. Objects. The create (interval = schedule, # set use created above 10 seconds interval of the interval object name = 'my_task1', Task ='celery_tasks.tasks.my_task1', args= js.dumps ([10, 20, 30]), <PeriodicTask: my_task1: every 10 seconds>Copy the code

Check the log of beat’s periodic tasks:

View the execution log of a single worker:

 

It can be seen that workers cannot execute tasks in parallel, so tasks are executed in a single worker in serial after they are sent from beat. Therefore, if you want to execute workers concurrently, you can enable multi-threading or multiple processes.

Start 2 workers to view execution logs:

 

So when you need to execute tasks in parallel, you need to set multipleworkerTo carry out the mission.

Create periodic tasks based on cronTab

Infinite loop execution BUG

When the crontab periodic task is used, the Beat service does not stop sending tasks. As a result, the beat service cannot be used. At present try a variety of ways, there is still no solution. Initialize the scheduling object for crontab:

import pytz from django_celery_beat.models import CrontabSchedule, PeriodicTask schedule, _ = CrontabSchedule.objects.get_or_create( minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', Timezone = pytz. Timezone (' Asia/Shanghai)) CrontabSchedule. Objects. The all () output results: [< < QuerySet CrontabSchedule: * * * * * (m/h/d/dM/MY) Asia/Shanghai>, <CrontabSchedule: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>Copy the code

Create a scheduled task with no parameters:

PeriodicTask. Objects. The create (crontab = schedule, # created above crontab object * * * * *, Name ='my_task2_crontab', # set task name value task='celery_tasks.tasks.my_task2', # specify tasks to be executed periodically)  <PeriodicTask: my_task2_crontab: * * * * * (m/h/d/dM/MY) Asia/Shanghai>Copy the code

The beat service sends the following logs:

 

You can query and delete periodic tasks

In fact, periodic tasks are also data stored in the database, basically based on ORM operations.

Query periodic tasks

Models import PeriodicTask from django_celery_beat.models import PeriodicTask from django_celery_beat.models import PeriodicTask <ExtendedQuerySet [<PeriodicTask: Importing contacts: every 10 seconds>, <PeriodicTask: my_task: every 10 seconds>, <PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]> # periodictask.objects. All (): Task1 = periodictask.objects.get (id=1) task1 <PeriodicTask: Importing contacts: every 10 seconds> Periodictask.objects. Get (name="my_task") Task2 = periodictask.objects.get (name="my_task") task2 <PeriodicTask: my_task: every 10 seconds>Copy the code

Delete periodic tasks

After obtaining the periodic task, I tried to delete it directly and found that it was an infinite loop:

Task1.delete () outputs: (1, {' django_celery_beat.periodicTask ': }) task2.delete() {' django_celery_beat.periodictask ': 1})Copy the code

If you want to delete a periodic task, you must first pause the task and then delete it as follows:

# Set periodictask.objects.get (name="my_task1").enabled = False Get (name="my_task1").save() # Delete this task from periodictask.objects.get (name="my_task1").delete()  (1, {'django_celery_beat.PeriodicTask': 1})Copy the code

After periodic tasks are paused, they are restarted

Of course, there will be a need to restart a task after suspending it, as follows:

Set task enabled to True:  PeriodicTask.objects.get(name="my_task1").enabled = True PeriodicTask.objects.get(name="my_task1").save()Copy the code

Xiaobian is a Python development engineer myself. I spent three days to organize a set of Python learning tutorials from the most basic Python scripts to Web development, crawlers, data analysis, data visualization, machine learning, etc. These materials have friends who want toClick on theCan receive