Distributed queue of Celery

Celery what is Celery? Celery is a simple, flexible and reliable distributed system written in Python for processing large amounts of information, which also provides the tools needed to operate and maintain distributed systems.

Celery focuses on real-time task processing and supports task scheduling.

To put it bluntly, it is a distributed queue management tool and we can quickly implement and manage a distributed queue using the include-provided interface.

1. Quick start (this article is based on Celery4.0)

Firstly, we need to understand that Celery itself is not a task queue, it is a tool for managing distributed task queues or, in another way, it encapsulates the operations that operate common task queues and we can use it to quickly manage task queues. You can also look at the rabbitMQ documentation and do it yourself without any problems.

Celery is language independent and although it is implemented in Python it provides interface support for other common languages. However, if you happen to be developing with Python then using Celery is natural.

To get Celery running we need to understand a few concepts:

Tasks with roles of producers and consumers. Brokers are places where the producers and consumers store/fetch products.

Common brokers include RabbitMQ, Redis, and Zookeeper

1.2 Result Stores/Backend as the name implies, results are stored. The results or status of tasks in queues need to be known to task senders. Therefore, Result Stores are required to store the results

Backend can be redis, Memcached, or even common data.

1.3 Workers are Workers in Celery, similar to consumers in the production/consumption model, who pick tasks out of queues and carry them out

1.4 Tasks are Tasks that we want to perform in queues. Generally, users, triggers or other operations will join the Tasks in queues and send them to workers for processing.

With these concepts in mind we can quickly implement a queue operation:

Here we use redis as celery broker and backend.

(See here for other brokers and Backend support)

Setup Celery and Redis and Python redis support:

Apt-get install celery apt-get install celery -server PIP install celery 2.10.4 or above, otherwise redis connection timeout error will occur, please refer to the details

Then, we need to write a task:

#tasks.py from celery import Celery

App = exfiltrate (‘tasks’, backend=’ exfiltrate ‘, exfiltrate = exfiltrate (‘ exfiltrate ‘, backend=’ exfiltrate ‘

Celery task def add(x, y): Return x + y OK, “broker”, “backend”, “task”, “worker”, “tasks.py”

Celery-a tasks worker –loglevel=info meaning that the worker running tasks set is working.

The last step is to trigger the task. The easiest way to do this is to write another script and call the function decorated as task:

#trigger. Py from tasks import add result = add.delay(4, 4) While not result.ready(): time.sleep(1) print ‘task done: {0}’.format(result.get())

Delay returns an AsyncResult object that holds an asynchronous result. Result.ready () is true when the task is completed and result.get() is used to fetch the result.

Now a simple celery application is complete.

  1. After a quick start we were able to use Celery to manage normal tasks but this is not enough for practical use scenarios so we need to dig deeper into more ways of using Celery.

Let’s start with the previous task:

Celery task def add(x, y): Task = app.task = tasks, app.task = tasks, app.task = tasks, app.task = tasks, app.task = tasks, app.task = tasks.

First, we can make the modified function a binding method of the Task object, which is equivalent to adding the modified function as a task instance method. We can call self to retrieve many of the state and properties of the current task instance.

Second, we can also clone the Task class ourselves and have the custom Task decorator add do some custom operations.

2.1 Performing Different Operations based on task Status After a task is executed, performing different operations based on task status requires copying tasks such as on_failure and on_success:

tasks.py

class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print ‘task done: {0}’.format(retval) return super(MyTask, self).on_success(retval, task_id, args, kwargs)

def on_failure(self, exc, task_id, args, kwargs, einfo):
    print 'task fail, reason: {0}'.format(exc)
    return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
Copy the code

@app.task(base=MyTask) def add(x, y): return x + y

Celery -a tasks worker –loglevel=info

Modify tasks again:

}} @app.task # celery task def add(x, y): raise KeyError return x + y

As you can see, our custom on_failure and on_success are executed after the task succeeds or fails

2.2 Binding tasks to instance methods

tasks.py

from celery.utils.log import get_task_logger

logger = get_task_logger(name) @app.task(bind=True) def add(self, x, y): Logger.info (self.request.dict) return x + y then rerun:

A task in progress has all sorts of information about its execution and can use this information to do many other things, such as determine whether a chained task has reached the end, and so on.

Detailed data on celery.task.request object can be found here

It is very common to know task states in practical scenarios, for Celery, there are several built-in task states as follows:

Parameter Description PENDING Task Waiting STARTED Task STARTED SUCCESS Task execution succeeded FAILURE Task execution failed RETRY tasks will be RETRY REVOKED tasks When we have a task that takes a long time, we generally want to know its real-time progress. Here, we need to define a task state to illustrate the progress and manually update the status, so as to tell the callback the progress of the current task. Specific implementation:

tasks.py

from celery import Celery import time

@app.task(bind=True) def test_mes(self): for i in xrange(1, 11): Time.sleep (0.1) self.update_state(state=”PROGRESS”, meta={‘p’: I *10}) return ‘finish’

trigger.py

from task import add,test_mes import sys

Def PM (body): res = body.get(‘result’) if body.get(‘status’) == ‘PROGRESS’: sys.stdout.write(‘\r ‘) {0}%’.format(res.get(‘p’))) sys.stdout.flush() else: Print ‘\r’ print res r = test_mes. Delay () print r.thread (on_message= PM, propagate=False)

Cycle tasks are also very simple, just configure cycle tasks in configuration and then run a cycle task trigger (beat) :

Create Celery configuration file celery_config.py:

celery_config.py

from datetime import timedelta from celery.schedules import crontab

CELERYBEAT_SCHEDULE = { ‘ptask’: { ‘task’: ‘tasks.period_task’, ‘schedule’: timedelta(seconds=5), }, }

CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0’ You can use datetime.timedelta or crontab or even the latitude and longitude coordinates of the solar system to configure the interval time, see here for details

If a scheduled task involves datetime, you need to add the time zone information to the configuration. Otherwise, utc is used by default. For example, China can add:

CELERY_TIMEZONE = ‘Asia/Shanghai’ then add tasks to tasks.py to be executed periodically:

tasks.py

app = Celery(‘tasks’, backend=’redis://localhost:6379/0′, broker=’redis://localhost:6379/0′) app.config_from_object(‘celery_config’)

@app.task(bind=True) def period_task(self): print ‘period task done: {0}’.format(self.request.id) then re-run the worker, then re-run the beat:

celery -A task beat

The periodic tasks are running properly ~

Some tasks may be composed of several sub-tasks, so it becomes very important to invoke each sub-task. Try not to invoke the sub-task in synchronous blocking mode, but to invoke the chain task in asynchronous callback mode:

Error model

@app.task def update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info)

@app.task def fetch_page(url): return myhttplib.get(url)

@app.task def parse_page(url, page): return myparser.parse_document(page)

@app.task def store_page_info(url, info): return pageinfo.objects.create (url, info

def update_page_info(url): # fetch_page -> parse_page -> store_page chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url) chain()

@app.task() def fetch_page(url): return myhttplib.get(url)

@app.task() def parse_page(page): return myparser.parse_document(page)

@app.task(ignore_result=True) def store_page_info(info, URL): pageinfo.objects.create (url=url, info=info) Example 2

fetch_page.apply_async((url), link=[parse_page.s(), Store_page_info. s(url)]) In chained tasks, the return value of the previous task is one of the input values of the next task by default (si() or S (immutable=True) can be called without the return value being the default argument).

Celery.signature (), celery.signature(), celery.signature(), celery.signature(), celery.signature(), celery.signature(), celery.signature(), celery.signature(), celery.signature(), celery.signature() Instead of executing the task, save the task and its parameters for invocation elsewhere.

2.6 Calling tasks We mentioned earlier that we can not call tasks in the ordinary way, but in the way like Add.delay (2, 2), and the chain task uses apply_async method to call. Delay is just a shortcut to apply_async. It does the same thing, but apply_async can do more things like callbacks and errbacks, execution timeouts, retries, retry times, etc. Specific parameters can be referred to here

2.7 About AsyncResult AsyncResult is mainly used to store task execution information and execution results, which is similar to the Future object in Tornado, both of which can store asynchronous results and task execution status. For those who write JS, Celery4.0 = celery4.0 = celery4.0 = celery4.0 = celery4.0 = celery4.0 = celery4.0 = celery4.0 = celery4.0

import gevent.monkey monkey.patch_all()

import time from celery import Celery

app = Celery(broker=’amqp://’, backend=’rpc’)

@app.task def add(x, y): return x + y

def on_result_ready(result): print(‘Received result for id %r: %r’ % (result.id, result.result,))

Add.delay (2, 2). Then (on_result_ready) Note that this promise writing can only be used when Backend is RPC (AMQP) or Redis. And when used independently, monkey patches of GEvent need to be introduced, which may affect other code. The official documentation suggests that this feature is more suitable for use with asynchronous frameworks such as Tornado, Twisted, etc.

Delay and apply_async generate AsyncResult objects. Delay and apply_async generate AsyncResult objects based on task ID: AsyncResult(task_id= XXX)

For more details on AsyncResult, see here

Using Celery for distributed queue management and development will greatly improve development efficiency, you can refer to the detailed official documentation for more detailed use of Celery