Method for creating asynchronous tasks Task

Any method decorated with a task is created as a task object, which can be serialized and sent to a remote server. It can be decorated in various ways:

  • Use the default parameters
@celery.task
def function_name():
    pass
Copy the code
  • Specify related parameters
@celery. Task (bind=True, name='name') def function_name(): celery. Task (bind=True, name='name') The default is the name of this function in the module's namespace. Serializer: specifies the method for serializing this task. Bind: a bool that sets whether or not to bind an instance of a task. If it is bound, the task instance is passed as a parameter to the task method. All attributes of the task instance are accessible. You can also define your own Task class default_retry_delay to set the retry delay of the Task. If the Task fails, the system automatically retries the Task in seconds (3 minutes by default). Autoretry_for: sets the task toretry on specific exceptions. The default is False, that is, no retry. Retry_backoff: The default value is False. This parameter specifies the retry delay policy. Retry_backoff_max: Sets the maximum retry delay. The default value is 10 minutes. If the retry fails, the system does not retry. Retry_jitter: the default value is True. That is, jitter is introduced to avoid retry task execution in a centralized manner. # when bind = True, the first parameter is the self, the add function refers to the task instance @ task # (bind = True) the first parameter is the self, with self. The attributes related to the request access def add (self, x, y) : try: Logger.info (self.request.id) except: self.retry() # Retry when the task failsCopy the code
  • Custom Task base classes
Import tasks (self, exc, task_id, args, kwargs, einfo): if (self, exc, task_id, args, kwargs, einfo): if ('{0! r} failed: {1! Format (task_id, exc)) def on_success(self, retval, task_id, args, kwargs) Def on_retry(self, exc, task_id, args, kwargs, einfo): pass@task (base=MyTask) def add(x, y): Raise KeyError() # raise KeyError() # raise KeyError() Task_id: indicates the ID of a task. Args: parameters of the task function; Kwargs: key-value pair parameter; Einfo: exception details during failures or retries. Retval: return value of the successful execution of the task.Copy the code

General properties of Task

Task.name: indicates the Task name. Task.request: information about the current Task. Task.max_retries: Sets the maximum number of retries. Task.throws: An optional tuple of the expected error class. Task.rate_limit: sets the rate limit of this Task type task. time_limit: sets the hard time limit (in seconds) of this Task type. Task.ignore_result: Does not store Task status. The default False; Task.store_errors_even_if_ignored: If True, an error will be stored even if the Task is configured to ignore the result. Task.serializer: String that identifies the default serialization method to use. Task.compression: String that identifies the default compression scheme to use. The default is task_compression. Task.backend: Specifies that the result storage backend of the Task is used by the Task. Task.acks_late: If set to True, messages for this Task will be acknowledged after the Task is executed, not before (default behavior), i.e., before the Task is executed. Task.track_started: If the True Task reports its status as "started" when the worker executes the Task. The default is False;Copy the code

Calling an asynchronous task

There are three methods for calling an asynchronous task, as follows:

Task.delay (): This is an alias for the apply_async method, but accepts simpler arguments; Task.apply_async (args=[arg1, arg2], kwargs={key:value, key:value}) : task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value}) Can accept complex parameter send_task(): can send asynchronous tasks that are not registered i.e. tasks not trimmed with celery. Task;Copy the code

1. app.send_task

# tasks.py from celery import Celery app = Celery() def add(x,y): Return x+y app.send_task('tasks.add',args=[3,4]) # Celery will send success even if it is empty, so celery execution may not find the function error;Copy the code

2. Task.delay

The Delay method is a simplified version of the apply_async method, with no execution options and only the parameters of the task can be passed.

@app.task def add(x, y, z=0): return x + y add.delay(30,40,z=5) # app.task def add(x, y, z=0): return x + y add.delay(30,40,z=5) #Copy the code

3. Task.apply_async

Apply_async supports execution options that override the global default arguments and execution options specified when defining the task, essentially calling the send_task method.

Add. Apply_async (args=[30,40], kwargs={'z':5}) Countdown: Sets the task to wait for a period of time before execution, in seconds. Eta: Defines the start time of the task. eta=time.time()+10; Expires: Sets a task time. If a task is not executed after the expires time, it will be discarded. Retry: Specifies whether to retry a failed task. Use true or false, default is true shadow: respecify the task name STR, overwriting the task name used in the log. Retry_policy: {}, retry policy. The values are as follows: max_retries: maximum number of retries (three by default) Interval_start: number of seconds between retries (0 by default) interval_step: The number of seconds between retries increased by each retry, either numeric or floating point. The default is 0.2 interval_max: The maximum number of seconds between retries, which is no longer increased by interval_step, can be a number or a floating point number. The default value is 0.2. Queue: Specifies the queue to be sent to. Exchange: specifies which switch to send to; Priority: the priority of the task queue, ranging from 0 to 255, with 0 being the highest priority for RabbitMQ. Serializer: task serialization method. Usually not set; Compression: a compression scheme, usually zlib, bzip2 headers: adding additional messages to the task; Link: callback method after a task is successfully executed. Is a signature object; Can be used as associated tasks; Can be a list of signature objects; Link_error: a callback method after a failed task, which is a signature object; # add. Apply_async ((2, 2), retry=True, retry_policy={'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max: 0.2,})Copy the code
  • Custom publisher, switch, routing key, queue, priority, sequence scheme and compression method:
Task.apply_async ((2,2), compression='zlib', serialize='json', queue='priority. High ', routing_key='web.add', priority=0, exchange='web_exchange')Copy the code

Gets task results and status

Since tasks sent by celery are tasks to be performed by other processes, if you need to monitor the status of tasks on the client, there are methods like:

R = task.apply_async() r.ady () # check the status of the task, return a Boolean value, return True, return False. R.set (timeout=1) # return None; r.set (timeout=1) R.ult # Return None if the task is not completed. R.status # PENDING, START, SUCCESS, R.traceback # If the task threw an exception, the original traceback information can be retrievedCopy the code

However, it is rarely used in general business as there are blocks required to get results of tasks performed, and the use scenario of celery is generally not concerned with results.