Celery is a distributed task queue with real-time processing and task scheduling. A task is a message, and the payload in the message contains all the data needed to perform the task.

Here’s how it works:

  1. Tasks that take longer to complete for web applications can be given to celery asynchronously and returned to the user after execution.
  2. The site’s scheduled tasks
  3. Other tasks performed asynchronously. Such as clearing/setting up the cache

It has the following characteristics:

  1. Task performance
  2. Manage background management tasks
  3. Tasks are associated with configuration management
  4. Multi-process, EventLet and Gevent modes are executed concurrently
  5. Error handling mechanism
  6. Task primitives, task groups, split and call chains

Celery architecture

  1. The Beat process will read the contents of the configuration file and periodically send tasks that are due in the configuration to the task queue
  2. Celery Worker, consumers who carry out tasks
  3. A Broker receives a task message from a producer, stores it in a queue and sends it to the consumer in sequence
  4. Producer performs scheduled tasks or invokes API to generate tasks and send them to the task queue for processing
  5. Result Backend Saves status information and results after tasks are processed

The overall mechanism is that a task publisher (usually a Web application) or task scheduling, that is, a scheduled task issues tasks to a message broker (using Redis or RabbitMQ). The message broker then sends tasks to the Worker for execution in sequence. The Worker stores the results in Backend. You can also use Redis.

Serialization of Celery

Json, YAML, and MsgPack are commonly used, where MSgPack is a binary JSON serialization scheme that is smaller and faster than JSON data structures. The purpose of serialization is to serialize and deserialize data in transit between the client and the consumer.

A case in point

  • Main program (instantiated Celery)

One way to do it is to do it directly and simply

from celery import Celery
from config import REDISHOST, REDISDB, REDISPORT
celery = Celery('backend', broker='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB),backend='redis://%s:%d/%d' % (REDISHOST, REDISPORT, REDISDB))
Copy the code

Alternatively, use a configuration file, similar to the Flask application instance

from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')
# celery.conf.update(app.config)
Copy the code
  • Celery configuration file

If the configuration is specified at instantiation time, it is not necessary. If you need to specify additional parameters, you can put them in the configuration file. Here are some common parameters:

Import tasks from include. schedules import crontab CELERY_IMPORTS = ("tasks", "graph_data_tasks") BROKER_URL = '' CELERY_RESULT_BACKEND = '' CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' # A value of None or 0 means results will never expire CELERY_TASK_RESULT_EXPIRES = 60 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # CELERYD_CONCURRENCY = 2 CELERYBEAT_SCHEDULE = {'graph_data': {'task': 'graph_data_tasks.sync_graph', 'schedule': timedelta(minutes=60), 'args': () }, 'rank_for_guchong': { 'task': 'backend.celerytasks.rank_for_guchong.calc_ability_schedule', 'schedule': crontab(hour=11, minute=55), 'args': () } }Copy the code
  • The tasks of Celery

Tasks are registered using the celery decorator Task () after importing the instantiated tasks in

@celery.task()
def calc_capacity(strategy_id, start_date, end_date, assets, flows, stocks, output_path):
    c = CalcAbility(strategy_id, assets, flows, stocks, output_path)
    c.run(start_date, end_date)
    return {'current': 1, 'total': 1, 'status': 'success', 'reason': ''}
Copy the code
  • Use the tasks
@be_data.route('/data/graph', methods=['GET']) # @token_required def gen_megapool_from_summary(): Pie_tuple = cache.get_SUMMARy_data (name='summary_data', data_type=float) meta = {} if not pie_tuple: task = sync_graph.apply_async(args=[]) return gen_response(data=meta, message="success", errorcode=0), 200 elif pie_tuple[11] ! = db_clients.hsize('summary_ability'): sync_graph.apply_async(args=[]) return gen_response(data=meta, message="success", errorcode=0), 200Copy the code

In addition, you can obtain the running status of a task by using task.id

task = calc_ability.AsyncResult(task_id)
response = {
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'time': task.info.get('time', 0),
            'start_time': start_time,
            'fundid_nums': calc_task.fundid_nums,
            'status': task.info.get('status', ''),
            'reason': task.info.get('reason', '')
        }
Copy the code
  • The implementation of Celery

celery --app=backend.celery worker --loglevel=DEBUG --config=celery_settings --beat

  • Specify the queue

CELERY_DEFAULT_QUEUE normally uses a queue with the default name Celery to hold tasks, which can be modified by CELERY_DEFAULT_QUEUE. Queues with different priorities can be used to ensure that high priority tasks do not need to wait to get corresponding tasks.

from kombu import Queue CELERY_QUEUES = ( Queue('default', routing_key='task.#'), Queue('web_tasks', Routing_key ='web.#')) # The default switch name is Tasks CELERY_DEFAULT_EXCHANGE =' Tasks' # The exchange type is topic CELERY_DEFAULT_EXCHANGE_TYPE = Default CELERY_DEFAULT_ROUTING_KEY = 'task.default' CELERY_ROUTES = {'projq.tasks.add': { 'queue': 'web_tasks', 'routing_key': 'web.add', } }Copy the code

Specify queue mode to start consumer process: celery -a projq worker-q web_tasks -l info

Tasks bound in celery

Tasks can be registered via the app.task decorator, note that when a function has multiple decorators, the app.task decorator needs to be in the outermost layer to keep the tasks running properly. There is a bind argument. When set to bind, a Task instance is bound to the Task, passed in as the first self argument, which accesses all of the Task’s attributes. A binding Task is used to try to re-execute the Task (using app.task.Retry ()), which gives you access to the currently requested Task information and any methods you add to the specified Task base class. You can also update the status information using the self.update_state() method

@celery.task(bind=True)
def calc_ability(self, fundIds, start_date, end_date, increment_end_date, maxsize):
    start_date = parse_date(start_date)
    end_date = parse_date(end_date)
        self.update_state(state='PROGRESS',
                              meta={'current': index, 'total': total,
                              'time': used_time.used_time() / 60, 'status': 'running', 'reason': ''})  
Copy the code

Stop the task

Rs = add.delay(1, 2) rs.REVOKE () Revoke (rs.task_id) # Revoke app. Control. Revoke (rs.task_id, Revoke (terminate=True, signal='SIGKILL') # Revoke (terminate=True, signal='SIGKILL') # Revoke (terminate=True, signal='SIGKILL') Using the KILL signal # and in the latest Celery3 version, Revoke return self. Revoke (task_id, destination=destination, task_id) terminate=True, signal=signal, **kwargs)Copy the code

Celery monitor tool Flower

pip install flower

Celery_settings: CELERY_SEND_TASK_SENT_EVENT = True then run flower-a back. celery –port=5555 in the same directory as start celery You can see the management interface. Go to http://localhost:5555

Use automatic extension

Celery – A proj worker – info – autoscale l = 6, 3 said keep three process at ordinary times, can achieve six maximum number of concurrent processes.

Use multi-processes in Celery tasks

from celery import Celery import time import multiprocessing as mp app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks") def test_func(i): print "beg... :", i time.sleep(5) print ".... end:", i return i * 5 @app.task def fun_1(n): curr_proc = mp.current_process() curr_proc.daemon = False p = mp.Pool(mp.cpu_count()) curr_proc.daemon = True for i in range(n): p.apply_async(test_func, args=(i,)) p.close() p.join() return 1 if __name__ == "__main__": app.start()Copy the code

If curr_proc.daemon=True, set it to False before starting multiple processes. If curr_proc.daemon=True, set it to False

Reference: Python Web Development in Action — Weiming Dong