This is the 21st day of my participation in the August More Text Challenge
Life is too short to learn Python together
Celery official
Celery official website: www.celeryproject.org/
Celery official documents in English: docs.celeryproject.org/en/latest/i…
Celery official documents in Chinese: docs.jinkan.org/docs/celery…
Celery is a simple, flexible and reliable distributed system that processes large amounts of messages
Asynchronous task queues that focus on real-time processing
It also supports task scheduling
Note:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
Celery asynchronous task frame
Can not rely on any server, start services with its own commands (internal support socket) There will be two services running at the same time, one is the Project service and the celery service will let the tasks that need to be handled asynchronously to the celery service which will asynchronously complete the project requirements when required
Man is a stand-alone service | the hospital is an independent operation of the service Under normal circumstances, people can complete all health action, don’t need the participation of the hospital; But when people are sick, they will be accepted by the hospital, and the solution to solve the problem of people being sick is handed over to the hospital. When everyone is not sick, the hospital operates independently, and when people are sick, the hospital will solve the needs of people being sick
Celery architecture
The structure of Celery consists of three parts: Message broker, task execution unit (worker) and Task result Store.
Message middleware
Celery itself does not provide a messaging service but can easily integrate with message middleware provided by third parties. RabbitMQ, Redis, etc
Task execution unit
Worker is the task execution unit provided by Celery and Worker runs concurrently in distributed system nodes.
Task result store
Tasks result Store is used to store the results of tasks performed by the Worker, Celery supports storing the results of tasks in different ways including AMQP, redis etc
Usage scenarios
Asynchronous execution: solve time-consuming tasks, submit time-consuming operation tasks to Celery for asynchronous execution such as SMS/email, push messages, audio & video processing etc
Delayed execution: Resolves the delayed task
Scheduled execution: Solves periodic (periodic) tasks, such as daily data statistics
Erection and configuration of celery
pip install celery
pip install eventlet
Message middleware: RabbitMQ/Redis
App =Celery(‘ tasks’, broker= ‘XXX’, backend= ‘XXX’)
Task structure of two types of celery
The basic structure
Write only one py file, such as celery_task.py
# worker file
from celery import Celery
broker='redis: / / 127.0.0.1:6379/1' # broker task queue
backend='redis: / / 127.0.0.1:6379/2' # Result store
app=Celery(__name__,broker=broker,backend=backend)
# Add task (use this decorator to decorate, @app.task)
@app.task
def add(x,y) :
print(x,y)
returnX + y start worker: You need to switch to the path of the py file, Celery_task -l info - Windows :celery worker -A celery_task -l info -p eventletSubmit tasks to the broker file
from celery_task import add
add(3.4) Execute directly and will not be added to the broker
ret = add.delay(1.2) Add a task to the broker
print(ret) # RET is the task number, which is used to obtain the task execution result in the later stage
View the task execution result file
from celery_task import app-
from celery.result import AsyncResult
id = '3e397fd7-e0c1-4c5c-999c-2655a96793bb'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('Mission failed')
elif async.status == 'PENDING':
print('Task in wait being executed')
elif async.status == 'RETRY':
print('Task retry after exception')
elif async.status == 'STARTED':
print('Task has been executed')
Copy the code
Package structure
- Create a new package with an arbitrary name, such as Celery_task
The package structure
-celery_task
–
__init__
.py-celery. Py (must be celery)
-task01.py(Task file, write a separate py file for each task)
-task02.py
-add_task.py: submits tasks for worker
_get_res.py: Obtains the task execution result
- celery.py
The worker file
from celery import Celery broker='redis: / / 127.0.0.1:6379/1' # broker task queue backend='redis: / / 127.0.0.1:6379/2' # Result store app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01'.'celery_task.task02']) Copy the code
- add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul
ret = add.delay(1.2)
print(ret)
ret = mul.delay(10.10)
print(ret)
Copy the code
- get_res.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '3d3779be-fe75-4ad4-ab31-d7dbe13d3e63'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('Mission failed')
elif async.status == 'PENDING':
print('Task in wait being executed')
elif async.status == 'RETRY':
print('Task retry after exception')
elif async.status == 'STARTED':
print('Task has been executed')
Copy the code
Executing a delayed task
- add_task.py
from celery_task.task01 import add
from celery_task.task02 import mul
# Non-deferred tasks
ret = add.delay(1.2)
print(ret)
ret = mul.delay(10.10)
print(ret)
Make tasks deferred
from datetime import datetime,timedelta
The delay time must be utc time, timedelta is backward based on UTC time
eta = datetime.utcnow() + timedelta(seconds=10)
ret = add.apply_async(args=(200.50),eta=eta)
print(ret)
Copy the code
Performing scheduled Tasks
Celery.py (worker), celery.py (worker), celery.py (worker), celery.py (worker), celery.py (worker)
In addition to Woker, a “worker” is also needed to deliver tasks to worker regularly to perform scheduled tasks. This worker is beat
When performing scheduled tasks, worker and Beat need to be started, with one responsible for executing tasks and the other for scheduled delivery tasks
Start worker, start beat -celery worker-a celery_task-l info -p eventlet-celery beat -A celery_task-l info
- celery.py
from celery import Celery
broker='redis: / / 127.0.0.1:6379/1' # broker task queue
backend='redis: / / 127.0.0.1:6379/2' # Result store
app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task01'.'celery_task.task02'])
# time zone
app.conf.timezone = 'Asia/Shanghai'
Whether to use UTC
app.conf.enable_utc = False
# Task timing configuration
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
# Tasks that need to be executed regularly
'task': 'celery_task.task01.add'.'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1)
Parameters required by the task
'args': (300.150),}}Copy the code
conclusion
The article was first published in the wechat public account Program Yuan Xiaozhuang, at the same time in nuggets.
The code word is not easy, reprint please explain the source, pass by the little friends of the lovely little finger point like and then go (╹▽╹)