Original link: blog.breezelin.cn/scheme-redi…

A gateway server, like a fast food restaurant, wants customers to come and go as quickly as possible so that more customers can be served at the same time. If a waiter in a fast food restaurant accompanies a customer the entire time he or she orders, waits for the meal, and pays the bill, the waiter or waitress will spend most of his or her time waiting at leisure. There should be a special waiter to take the order, a special waiter to deliver the food, a special waiter to pay the bill, so as to improve efficiency. Similarly, the gateway server needs a clear division of labor. Here’s an example:

If you have a gateway interface that requests to send a password reset email, know that it may take several seconds to send an email. If the gateway server sends a password reset email directly to the user online, it can easily cause network congestion in the case of high concurrency. In practice, however, the gateway server does not have to wait for the email to be sent before responding to the user. It can simply inform the user that the email will be sent and then send it offline (like the waiter in a fast food restaurant who tells the customer to go find a table first and someone will bring the meal to him when it is ready).

So who sends the mail?

Task queue

In order for the gateway interface to respond to user requests as soon as possible, time-consuming operations that do not require immediate results can be handled by the task queue mechanism. Task queue mechanism contains two roles, one is task producer, the other is task consumer, and task queue is the link between the two:

  • The producer puts the task into the queue;
  • The consumer fetches the task from the queue.

The overall operation process of the task queue is as follows: The task producer abstracts the key information of the current operation (the current operation can be restored later based on this information), for example, to send an email to reset the password, we only need the email address and user name of the current user. When a task producer puts a task into a queue, it actually stores the key information of the task. Data storage tools such as MySQL and Redis are used here. Redis is commonly used. Task consumers are constantly fetching task information from the database and executing it one by one.

The job of task producer is task distribution, which is generally performed by online gateway service program. The task consumer’s job is task scheduling, typically performed by offline programs, so that no matter how long the task takes, it does not block the gateway service.

The focus here is on task scheduling (task consumer) programming.

Simple and direct

Queues :default (queues:default); queues:default (queues:default); queues:default (queues:default)


      
// PHP pseudocode
    Redis::rpush('queues:default', serialize($task));
Copy the code

Then task scheduling can be implemented as simple and direct as:


      
// PHP pseudocode
Class Worker {

    public function schedule(a) {
        while(1) {
            $seri = Redis::lpop('queues:default');
            if($seri) {
                $task = unserialize($seri);
                $this->handle($task);
                continue;
            }
            sleep(1); }}public function handle($task) {
        // do something time-consuming
    }
}

$worker = new Worker;
$worker->schedule();
Copy the code

Accident insurance

The above code removes the first task (LPOP) directly from the Queues: Default list because the Handle ($task) function is a time-consuming operation. If something happened to cause the entire program to exit, the task might not have completed, but the task information would have been completely lost. To be safe, make the following changes to the schedule() function:


      .public function schedule(a) {
        while(1) {
            $seri = Redis::lindex('queues:default'.0);
            if($seri) {
                $task = unserialize($seri);
                $this->handle($task);
                Redis::lpop('queues:default');
                continue;
            }
            sleep(1); }}...Copy the code

The task information is removed from the list only after the task is complete.

Delay to perform

Queues: Default: Delayed The list of tasks that need to be executed immediately, but those that need to be executed at intervals or at a point in time, can be stored with an ordered set called Queues: Default: Delayed. $time:


      
// PHP pseudocode
    Redis::zadd('queues:default:delayed', $time, serialize($task));
Copy the code

Queues :default If the queues:default list is empty, remove the queues:default:delayed tasks from the queues: Default: Delayed set and place them in the queues: Default list: queues: Default


      .public function schedule(a) {
        while(1) {
            $seri = Redis::lindex('queues:default'.0);
            if($seri) {
                $task = unserialize($seri);
                $this->handle($task);
                Redis::lpop('queues:default');
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:delayed'.0, time());
            if($seri_arr) {
                Redis::rpush('queues:default', $seri_arr);
                continue;
            }
            sleep(1); }}...Copy the code

Mission timeouts

Estimate the maximum time required for the task to execute properly. If the task executes beyond this time, it may be due to some accident. If it is allowed to continue to be stuck, subsequent tasks will not be executed. First, we set a time limit attribute timeout for the task, and then set an alarm signal for the process itself before executing the task. Receiving the signal after timeout means that the task has timed out and the current process needs to exit (when the supervisor daemon process exits, the Supervisor will automatically pull up the process again). Note: pcnTL_alarm ($timeout) overwrites the previous alarm signal, while pcnTL_alarm (0) cancels the alarm signal; After the task times out, the current task is placed in the queues: Default: Delayed collection to avoid blocking the queue again.


      .public function schedule(a) {
        while(1) {
            $seri = Redis::lindex('queues:default'.0);
            if($seri) {
                $task = unserialize($seri);
                $this->timeoutHanle($task);
                $this->handle($task);
                Redis::lpop('queues:default');
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:delayed'.0, time());
            if($seri_arr) {
                Redis::rpush('queues:default', $seri_arr);
                continue;
            }
            pcntl_alarm(0);
            sleep(1); }}public function timeoutHanle($task) {
        $timeout = (int)$task->timeout;
        if ($timeout > 0) {
            pcntl_signal(SIGALRM, function (a) {
                $seri = Redis::lpop('queues:default');
                Redis::zadd('queues:default:delayed', time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); }...Copy the code

Concurrent execution

Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default: Queues: Default To avoid the problem of repeat execution, we need to introduce an ordered collection called SortedSet to hold the tasks being executed, named Queues: Default: Reserved. Queues: Default: Reserved Queues: Default: Reserved Queues: Default: Reserved Queues: Default: Reserved Queues: Default: Reserved Queues: Default: Reserved Queues: Default: Reserved: Queues: Default: Reserved: Queues: Default: Reserved: Queues: Default: Reserved: Queues: Default: Reserved: Queues: Default: Reserved It is possible that some processes accidentally quit while executing tasks, so place those tasks in the Queues: Default: Delayed collection and execute them later.


      .public function schedule(a) {
        while(1) {
            $seri = Redis::lpop('queues:default'.0);
            if($seri) {
                Redis::zadd('queues:default:reserved', time()+10, $seri);
                $task = unserialize($seri);                
                $this->timeoutHanle($task);
                $this->handle($task);
                Redis::zrem('queues:default:reserved', $seri);
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:delayed'.0, time());
            if($seri_arr) {
                Redis::rpush('queues:default', $seri_arr);
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:reserved'.0, time()- 60*60);
            if($seri_arr) {
                foreach($seri_arr as $seri) {
                    Redis::zadd('queues:default:delayed', time()+10, $seri);
                }
            }

            sleep(1); }}public function timeoutHanle($task) {
        $timeout = (int)$task->timeout;
        if ($timeout > 0) {
            pcntl_signal(SIGALRM, function (a) use ($task) {
                $seri = serialize($task);
                Redis::zrem('queues:default:reserved', $seri);
                Redis::zadd('queues:default:delayed', time()+10), $seri); posix_kill(getmypid(), SIGKILL); }); } pcntl_alarm($timeout); }...Copy the code

other

Failure to retry

The above code does not verify whether the task is successfully executed, there should be a task failure handling mechanism: For example, set the attribute retry_times with the maximum retry times. When the retry_times is executed once, the task will be stored in the queues: Default: Failed list when the retry_times is equal to 0. Queues: Default: Delayed Otherwise, queues: Default: Delayed will be executed later.

Sleep time

The above code is busy process continuous execution, idle sleep for a second, can be adjusted and optimized as needed.

Event listeners

To perform operations when a task succeeds or fails, set afterSucceeded() or afterFailed() to callback the task when required.

The last

The above describes the gradual evolution of a task scheduler, designed in large part by reference to the Laravel Queue. Use tools, know how, know why.