This article is participating in Python Theme Month. See the links for more details

I. On celery

Celery is a Python implementation of asynchronous task queues that can be used for crawlers, Web background queries, calculations, and more. With task queues, you can no longer wait around for a task when it arrives.

His structure is as follows:

  • Broker

Our producer creates tasks and then enters the tasks scheduling queue middleware Broker, which schedules messages (tasks) into message queues by scheduling rules. Brokers rely on third party queue message brokers such as RabbitMQ, Redis etc.

  • Worker

The workers, they look at the message queue, and when there’s a message in the queue they take it and process it.

  • Backend

It is used to store the results processed by the worker, such as commonly used databases.


# # # # to use celery

In this article we use Rabbitmq (celery recommended) as message broker middleware.

We created the celery directory as follows

learn_celery/ ... celery_env/ ... celery.py ... my_task1.py ... my_task2.py ... task1_run.py ... task2_run.pyCopy the code
Create virtual environment and install celery, flower (web monitor), not detailed here.
2. Install our message queue middlewarerabbitmq

This is run and configured as a Docker, specifying the hostname as Rabbit (rabbitMQ is accessed by hostname, so it must be) and the container name as Celery_rabbitMQ

docker run -d -p 5672:5672 -h rabbit --name celery_rabbitmq rabbitmq
Copy the code

Add user for celery access and configure configure, write and read permissions, below we configure rabbit_user to have all configuration, write and read permissions.

docker exec -it celery_rabbitmq rabbitmqctl add_user rabbit_user rabbit_pass docker exec -it celery_rabbitmq rabbitmqctl  add_vhost rabbit_vhost docker exec -it celery_rabbitmq rabbitmqctl set_user_tags rabbit_user celery docker exec -it celery_rabbitmq rabbitmqctl set_permissions -p rabbit_vhost rabbit_user ".*" ".*" ".*"Copy the code
Create celery application
#celery.py
from celery import Celery

broker_rabbitmq="amqp://rabbit_user:rabbit_pass@i-k9pwet2d/rabbit_vhost"
app=Celery("learn_celery",broker=broker_rabbitmq,backend="rpc://",include=["learn_celery.my_task2"."learn_celery.my_task2"])
Copy the code

We instantiate Celery by creating app with project package name learn_celery and connect to RabbitMQ via broker_RabbitMQ with amQP protocol format for RabbitMQ

amqp://userid:password@hostname:port/virtual_host
Copy the code

Since we started RabbitMQ in docker, our hostname should be the hostname of the host machine.

Learn_include. my_task1, learn_include. my_task2

4. Create two tasks (messages)
#my_task1.py
from .celery import app
import time

@app.task
def args_add1(x,y) :
    print("start task no.1 now!")
    time.sleep(10)
    print("task no.1 end!")
    return x+y

#my_task12.py
from .celery import app
import time

@app.task
def args_add2(x,y) :
    print("start task no.2 now!")
    time.sleep(20)
    print("task no.2 end!")
    return x+y
Copy the code

Here we import app in celery and use it to decorate our method args_add where we simulate task processing times of 10s and 20s respectively and return the result.

Send tasks to Celery
#tasks1_run.py
from .my_task1 import args_add1
import time

reslut=args_add1.delay(11.22)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(15)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

#tasks2_run.py
from .my_task2 import args_add2
import time

reslut=args_add2.delay(33.44)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))
time.sleep(25)
print("task over? {}".format(reslut.ready()))
print("task reslut:{}".format(reslut.result))

Copy the code

On task delay, the official documentation (reference) describes it like this, which I understand to mean that tasks are sent to celery or tasks are called to come in.

Reslut.ready () returns True or False whether task execution is complete

Reslut. result Returns the task execution result

We check once separately at tasks entry celery and end.


Two, look at the results

1. Start the worker

Go to the parent directory of learn_celery. Start the application worker for learn_celery and specify the number of concurrent tasks to 10

celery -A learn_celery worker --loglevel=info --concurrency=10
Copy the code

If celery links to rabbitmq are normal we can see the following info

2. Execute the task

For easy observation we open another window 2 and go to the learn_celery parent directory and run the task1_run module

python -m learn_celery.tasks1_run
Copy the code

Open window 3, go to learn_celery parent directory and run task2_run module

python -m learn_celery.tasks2_run
Copy the code

It can be seen that after the waiting time of each task, both tasks are successfully executed and the result is obtained. Next, we go to worker to check info

Due to the concurrency of tasks, tasks received are immediately called into execution, task 1 takes 10s and result is 33, task 2 takes 20s and result is 77


Monitor celery with Flower

1. Start the flower
celery -A learn_celery flower
Copy the code
2. View web monitoringhttp://ip:5555

inTasksYou can view the status, parameters, receive, start, and execution time of the current task queue.DashboradTo view information about the current worker node


Welcome to point out the inadequacies of the article.

Favorites, likes and questions are welcome. Follow the top water cooler managers and do something other than pipe hot water.


NEXT

  • In-depth understanding of celery

  • Celery in Django