Native celery, non-djcelery module, all demonstrations are based on Django2.0

Celery is a simple, flexible and reliable python based distributed task queue framework that supports task scheduling on distributed machines/processes/threads using task queues. A typical producer-consumer model is adopted, which is mainly composed of three parts:

  1. Message queue Broker: A broker is essentially an MQ queue service that can use Redis, RabbitMQ, etc
  2. The broker notifies the worker that there is a task in the queue. The worker takes the task out of the queue and executes it. Each worker is a process
  3. Backend for storing results: Execution results are stored in Backend and in MQ queue services used by the broker by default. You can also configure which service to use backend

Asynchronous tasks

In my asynchronous application scenario, the project goes online: there is an online button on the front-end Web. After clicking the button, a request is sent to the back end, which takes 5 minutes to go online. After receiving the request, the back end puts the task into the queue for asynchronous execution and immediately returns the result of the task execution to the front end. What if there were no asynchronous execution? Synchronization is when the front end is waiting for the back end to return the result, and the page turns around and times out.

Asynchronous Task Configuration

Install RabbitMQ. We will use RabbitMQ as the broker. It starts by default and does not require any configuration

# apt-get install rabbitmq-server
Copy the code

2. Install the celery

# pip3 install celery
Copy the code

Django project directory structure (simplified) is as follows

website/
|-- deploy
|   |-- admin.py
|   |-- apps.py
|   |-- __init__.py
|   |-- models.py
|   |-- tasks.py
|   |-- tests.py
|   |-- urls.py
|   `-- views.py
|-- manage.py
|-- README
`-- website
    |-- celery.py
    |-- __init__.py
    |-- settings.py
    |-- urls.py
    `-- wsgi.py

Copy the code

Create website/celery. Py main file

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE'.'website.settings')

app = Celery('website')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Allow root to run celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
    print('Request: {0! r}'.format(self.request))
Copy the code

5. Add the following to the website/__init__.py file to make sure the app is loaded when Django starts

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']
Copy the code

6. Create the tasks.py file for each application

from __future__ import absolute_import
from celery import shared_task

@shared_task
def add(x, y):
    return x + y
Copy the code
  • Note that tasks.py must be built in the root directory of each app and can only be called tasks.py

7. The reference in views.py uses this task for asynchronous processing

from deploy.tasks import add

def post(request):
    result = add.delay(2, 3)
Copy the code
  • Use the function name.delay() to execute the function asynchronously
  • Can be achieved byresult.ready()To determine whether the task has been processed
  • If the task throws an exception, useresult.get(timeout=1)You can rethrow an exception
  • If the task throws an exception, useresult.tracebackYou can retrieve the original traceback information

8. To start the celery

# celery -A website worker -l info
Copy the code

9. Add can be handled asynchronously when post is called

Timing task

The use of scheduled tasks is very common, for example, I need to send a report to the boss on a regular basis

Scheduled Task Configuration

Add the following configuration to website/celery. Py file to support crontab for scheduled tasks

from celery.schedules import crontab

app.conf.update(
    CELERYBEAT_SCHEDULE = {
        'sum-task': {
            'task': 'deploy.tasks.add'.'schedule':  timedelta(seconds=20),
            'args': (5, 6)}'send-report': {
            'task': 'deploy.tasks.report'.'schedule': crontab(hour=4, minute=30, day_of_week=1),
        }
    }
)
Copy the code
  • Two tasks are defined:

    • The task named ‘sum-task’ executes the add function every 20 seconds and passes two arguments, 5 and 6
    • Task named ‘send-report’ executes the report function every Monday at 4:30 am
  • Timedelta is an object in datetime that needs to be imported from datetime import timedelta with the following parameters

    • daysDay:
    • seconds: s
    • microseconds: subtle
    • milliseconds: ms
    • minutes: points
    • hours: hours
  • The crontab parameters are:

    • month_of_yearIn:
    • day_of_monthDate:
    • day_of_weekWeek of:
    • hour: hours
    • minute: the minute

2. Add the report method to the deploy/tasks.py file:

@shared_task
def report():
    return 5
Copy the code

Constantly deciding if there are tasks to be carried out

# celery -A website beat -l info
Copy the code

Tips

Celery -A website worker-b-l info if you are using both asynchronous tasks and scheduled tasks, there is A simpler way to start worker and beat 2. If you are not using rabbitmq you need to configure broker and backend in the main configuration file website/ include. py

Redis does the MQ configuration
app = Celery('website', backend='redis', broker='redis://localhost')
# Rabbitmq do the MQ configuration
app = Celery('website', backend='amqp', broker='amqp://admin:admin@localhost')
Copy the code

Celerate = True celerate = True celerate = True celerate = True celerate = True celerate = True celerate = True celerate = True Need to add CELERYD_MAX_TASKS_PER_CHILD = 10 to indicate how many tasks each worker executes

Reference article:

  • docs.celeryproject.org/en/latest/
  • Github.com/pylixm/cele…
  • Pylixm. Cc/posts / 2015 -…

Related articles are recommended reading

  • Django Model Select
  • Introduction to the various uses of Django Model Update
  • Django configures tasks with asynchronous tasks and timed tasks
  • Django Model to a dictionary
  • Django uses Signals to send notifications that detect changes in the Model field
  • Django+Echarts drawing example
  • Djangos password management table
  • Django+JWT implements Token authentication
  • Djangos integrated Markdown editor
  • Djangos default permissions mechanism introduction and practice
  • Djangos built-in permission extension example
  • Django integrates OpenLDAP authentication
  • Django uses Channels to implement WebSocket
  • Django implements WebSocket using Channels — part 2
  • Write a freehand static page generator using Django
  • Django+zTree builds the organizational structure tree