This is the 21st day of my participation in the August More Text Challenge

Life is too short to learn Python together

Celery official

Celery official website: www.celeryproject.org/

Celery official documents in English: docs.celeryproject.org/en/latest/i…

Celery official documents in Chinese: docs.jinkan.org/docs/celery…

Celery is a simple, flexible and reliable distributed system that processes large amounts of messages

Asynchronous task queues that focus on real-time processing

It also supports task scheduling

Note:

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

Celery asynchronous task frame

Can not rely on any server, start services with its own commands (internal support socket) There will be two services running at the same time, one is the Project service and the celery service will let the tasks that need to be handled asynchronously to the celery service which will asynchronously complete the project requirements when required

Man is a stand-alone service | the hospital is an independent operation of the service Under normal circumstances, people can complete all health action, don’t need the participation of the hospital; But when people are sick, they will be accepted by the hospital, and the solution to solve the problem of people being sick is handed over to the hospital. When everyone is not sick, the hospital operates independently, and when people are sick, the hospital will solve the needs of people being sick

Celery architecture

The structure of Celery consists of three parts: Message broker, task execution unit (worker) and Task result Store.

Message middleware

Celery itself does not provide a messaging service but can easily integrate with message middleware provided by third parties. RabbitMQ, Redis, etc

Task execution unit

Worker is the task execution unit provided by Celery and Worker runs concurrently in distributed system nodes.

Task result store

Tasks result Store is used to store the results of tasks performed by the Worker, Celery supports storing the results of tasks in different ways including AMQP, redis etc

Usage scenarios

Asynchronous execution: solve time-consuming tasks, submit time-consuming operation tasks to Celery for asynchronous execution such as SMS/email, push messages, audio & video processing etc

Delayed execution: Resolves the delayed task

Scheduled execution: Solves periodic (periodic) tasks, such as daily data statistics

Erection and configuration of celery

pip install celery

pip install eventlet

Message middleware: RabbitMQ/Redis

App =Celery(‘ tasks’, broker= ‘XXX’, backend= ‘XXX’)

Task structure of two types of celery

The basic structure

Write only one py file, such as celery_task.py

# worker file
from celery import Celery
broker='redis: / / 127.0.0.1:6379/1'  # broker task queue
backend='redis: / / 127.0.0.1:6379/2' # Result store
app=Celery(__name__,broker=broker,backend=backend)

# Add task (use this decorator to decorate, @app.task)
@app.task
def add(x,y) :
    print(x,y)
    returnX + y start worker: You need to switch to the path of the py file, Celery_task -l info - Windows :celery worker -A celery_task -l info -p eventletSubmit tasks to the broker file
from celery_task import add
add(3.4)  Execute directly and will not be added to the broker
ret = add.delay(1.2)  Add a task to the broker
print(ret)  # RET is the task number, which is used to obtain the task execution result in the later stage

View the task execution result file
from celery_task import app-
from celery.result import AsyncResult
id = '3e397fd7-e0c1-4c5c-999c-2655a96793bb'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('Mission failed')
        elif async.status == 'PENDING':
        print('Task in wait being executed')
    elif async.status == 'RETRY':
        print('Task retry after exception')
    elif async.status == 'STARTED':
        print('Task has been executed')
Copy the code

Package structure

  • Create a new package with an arbitrary name, such as Celery_task

The package structure

-celery_task

​ –__init__.py

-celery. Py (must be celery)

-task01.py(Task file, write a separate py file for each task)

​ -task02.py

-add_task.py: submits tasks for worker

_get_res.py: Obtains the task execution result

  • celery.py

The worker file

from celery import Celery
broker='redis: / / 127.0.0.1:6379/1'  # broker task queue
backend='redis: / / 127.0.0.1:6379/2' # Result store
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01'.'celery_task.task02'])
Copy the code
  • add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul


ret = add.delay(1.2)
print(ret)

ret = mul.delay(10.10)
print(ret)
Copy the code
  • get_res.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3d3779be-fe75-4ad4-ab31-d7dbe13d3e63'


if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('Mission failed')
    elif async.status == 'PENDING':
        print('Task in wait being executed')
    elif async.status == 'RETRY':
        print('Task retry after exception')
    elif async.status == 'STARTED':
        print('Task has been executed')
Copy the code

Executing a delayed task

  • add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul

# Non-deferred tasks
ret = add.delay(1.2)
print(ret)
ret = mul.delay(10.10)
print(ret)


Make tasks deferred
from datetime import datetime,timedelta
The delay time must be utc time, timedelta is backward based on UTC time
eta = datetime.utcnow() + timedelta(seconds=10)
ret = add.apply_async(args=(200.50),eta=eta)
print(ret)
Copy the code

Performing scheduled Tasks

Celery.py (worker), celery.py (worker), celery.py (worker), celery.py (worker), celery.py (worker)

In addition to Woker, a “worker” is also needed to deliver tasks to worker regularly to perform scheduled tasks. This worker is beat

When performing scheduled tasks, worker and Beat need to be started, with one responsible for executing tasks and the other for scheduled delivery tasks

Start worker, start beat -celery worker-a celery_task-l info -p eventlet-celery beat -A celery_task-l info

  • celery.py
from celery import Celery


broker='redis: / / 127.0.0.1:6379/1'  # broker task queue
backend='redis: / / 127.0.0.1:6379/2' # Result store
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01'.'celery_task.task02'])


# time zone
app.conf.timezone = 'Asia/Shanghai'
Whether to use UTC
app.conf.enable_utc = False
# Task timing configuration
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-task': {
        # Tasks that need to be executed regularly
        'task': 'celery_task.task01.add'.'schedule': timedelta(seconds=3),
        # 'schedule': crontab(hour=8, day_of_week=1)
        Parameters required by the task
        'args': (300.150),}}Copy the code

conclusion

The article was first published in the wechat public account Program Yuan Xiaozhuang, at the same time in nuggets.

The code word is not easy, reprint please explain the source, pass by the little friends of the lovely little finger point like and then go (╹▽╹)