This article is participating in Python Theme Month. See the links for more details
I. On celery
Celery is a Python implementation of asynchronous task queues that can be used for crawlers, Web background queries, calculations, and more. With task queues, you can no longer wait around for a task when it arrives.
His structure is as follows:
- Broker
Our producer creates tasks and then enters the tasks scheduling queue middleware Broker, which schedules messages (tasks) into message queues by scheduling rules. Brokers rely on third party queue message brokers such as RabbitMQ, Redis etc.
- Worker
The workers, they look at the message queue, and when there’s a message in the queue they take it and process it.
- Backend
It is used to store the results processed by the worker, such as commonly used databases.
# # # # to use celery
In this article we use Rabbitmq (celery recommended) as message broker middleware.
We created the celery directory as follows
learn_celery/ ... celery_env/ ... celery.py ... my_task1.py ... my_task2.py ... task1_run.py ... task2_run.pyCopy the code
Create virtual environment and install celery, flower (web monitor), not detailed here.
2. Install our message queue middlewarerabbitmq
This is run and configured as a Docker, specifying the hostname as Rabbit (rabbitMQ is accessed by hostname, so it must be) and the container name as Celery_rabbitMQ
docker run -d -p 5672:5672 -h rabbit --name celery_rabbitmq rabbitmq
Copy the code
Add user for celery access and configure configure, write and read permissions, below we configure rabbit_user to have all configuration, write and read permissions.
docker exec -it celery_rabbitmq rabbitmqctl add_user rabbit_user rabbit_pass docker exec -it celery_rabbitmq rabbitmqctl add_vhost rabbit_vhost docker exec -it celery_rabbitmq rabbitmqctl set_user_tags rabbit_user celery docker exec -it celery_rabbitmq rabbitmqctl set_permissions -p rabbit_vhost rabbit_user ".*" ".*" ".*"Copy the code
Create celery application
#celery.py
from celery import Celery
broker_rabbitmq="amqp://rabbit_user:rabbit_pass@i-k9pwet2d/rabbit_vhost"
app=Celery("learn_celery",broker=broker_rabbitmq,backend="rpc://",include=["learn_celery.my_task2"."learn_celery.my_task2"])
Copy the code
We instantiate Celery by creating app with project package name learn_celery and connect to RabbitMQ via broker_RabbitMQ with amQP protocol format for RabbitMQ
amqp://userid:password@hostname:port/virtual_host
Copy the code
Since we started RabbitMQ in docker, our hostname should be the hostname of the host machine.
Learn_include. my_task1, learn_include. my_task2
4. Create two tasks (messages)
#my_task1.py
from .celery import app
import time
@app.task
def args_add1(x,y) :
print("start task no.1 now!")
time.sleep(10)
print("task no.1 end!")
return x+y
#my_task12.py
from .celery import app
import time
@app.task
def args_add2(x,y) :
print("start task no.2 now!")
time.sleep(20)
print("task no.2 end!")
return x+y
Copy the code
Here we import app in celery and use it to decorate our method args_add where we simulate task processing times of 10s and 20s respectively and return the result.
Send tasks to Celery
#tasks1_run.py
from .my_task1 import args_add1
import time
reslut=args_add1.delay(11.22)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(15)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
#tasks2_run.py
from .my_task2 import args_add2
import time
reslut=args_add2.delay(33.44)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(25)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
Copy the code
On task delay, the official documentation (reference) describes it like this, which I understand to mean that tasks are sent to celery or tasks are called to come in.
Reslut.ready () returns True or False whether task execution is complete
Reslut. result Returns the task execution result
We check once separately at tasks entry celery and end.
Two, look at the results
1. Start the worker
Go to the parent directory of learn_celery. Start the application worker for learn_celery and specify the number of concurrent tasks to 10
celery -A learn_celery worker --loglevel=info --concurrency=10
Copy the code
If celery links to rabbitmq are normal we can see the following info
2. Execute the task
For easy observation we open another window 2 and go to the learn_celery parent directory and run the task1_run module
python -m learn_celery.tasks1_run
Copy the code
Open window 3, go to learn_celery parent directory and run task2_run module
python -m learn_celery.tasks2_run
Copy the code
It can be seen that after the waiting time of each task, both tasks are successfully executed and the result is obtained. Next, we go to worker to check info
Due to the concurrency of tasks, tasks received are immediately called into execution, task 1 takes 10s and result is 33, task 2 takes 20s and result is 77
Monitor celery with Flower
1. Start the flower
celery -A learn_celery flower
Copy the code
2. View web monitoringhttp://ip:5555
inTasks
You can view the status, parameters, receive, start, and execution time of the current task queue. 在Dashborad
To view information about the current worker node
Welcome to point out the inadequacies of the article.
Favorites, likes and questions are welcome. Follow the top water cooler managers and do something other than pipe hot water.
NEXT
-
In-depth understanding of celery
-
Celery in Django