We’ve moved from a single-process consumption asynchronous task system with Redis to a multi-process consumption model with Swoole.
Based on the experience of the first two simple systems, the rabbitMq-based asynchronous task system is designed to be more complete, including multi-process consumption, exception retry, etc.
System is introduced
As can be seen from the figure, our system is an event-based asynchronous task system. That is, when an event is generated, the producer throws the event to the scheduler, which is responsible for querying the tasks under the event, and then dropping these tasks into the corresponding queue. Finally, the consumer consumes the tasks in the task queue.
There are three major parts in the whole system
1. Event producer, that is, the party that produces the message event. 2. Task Scheduler, which registers events and schedules tasks. 3. Consumer (Worker), responsible for the tasks in the consumption task queue.
Event producer
Event producers are simple to call directly from a business system, with the following code.
require_once __DIR__.'/.. /autoload.php';
use Asynclib\Ebats\Event;
try{
$event = new Event('order_paied'); // Define events
$event->setOptions(['order_id'= >'FB138020392193312']); // Event generated parameters
$event->publish();
}catch (Exception $exc){
echo $exc->getMessage();
}Copy the code
Task scheduler
The scheduler does two main things, one is to register events and the other is to schedule tasks.
The registration event code is as follows:
// Register events
EventManager::register('order_create'.'closeOrder'.'demo'.10);// Close outstanding orders (defer tasks)
EventManager::register('order_paied'.'virtualShipping'.'demo'); // Virtual goods automatically shipCopy the code
This registers two events, each with a task under it.
Specific scheduling part of the code is very simple, will not repeat, interested can go to see the code.
consumers
The most important part of an asynchronous task system is the consumption end. Now let’s look at the flow chart of Worker.
As you can see, here we have two switches and two queues, one for normal tasks (NTask) and one for deferred tasks (DTask). Briefly describe the life cycle of the next task.
Normal task
Exchange[ebatS_CORE_nTask] sends the task to the corresponding queue according to topic. Ntask blocks the subprocess and waits for the task to be successfully obtained. Throw RetryException when retry is required, If there is no need to retry, a TaskException is thrown. 5. If the sub-process nTask caught the retry exception, it will throw the task to Exchange[ebatS_CORE_dTask], which is the Exchange of the delayed task
Delayed tasks
The subprocess dTask blocks and waits for the task to be successfully obtained and executed. 2. If the task fails to be executed and retry is required, RetryException will be thrown. If there is no need to retry, a TaskException will be thrown. 3. If the sub-process DTask caught the retry exception, it will throw the task to Exchange[ebatS_CORE_DTask]
The consumer code is as follows:
require_once __DIR__.'/.. /autoload.php';
require_once __DIR__.'/task/TaskDemoModel.php';
use Asynclib\Ebats\Worker;
// Execute the result callback function
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){}; $worker =new Worker($callback); // Support multi-process consumption the default value is 1
$worker->setQueue('demo'); // Queue names correspond to the topic of the event
$worker->run();Copy the code
Custom scheduler
Generally speaking, this is an event-based task system, so can you generate tasks directly? The answer is yes.
You simply create a custom scheduler, implement the scheduling logic yourself, and eventually generate a task. The code is as follows:
require_once __DIR__.'/.. /autoload.php';
use Asynclib\Ebats\Task;
use Asynclib\Core\Consumer;
use Asynclib\Amq\ExchangeTypes;
use Asynclib\Exception\ExceptionInterface;
/** * This example demonstrates how to create a custom scheduler. Developers can develop their own task scheduler according to their own needs */
try{
$worker = new Consumer();
$worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
$worker->setQueue('shzf_order_paied'['*.*.WAIT_SELLER_SEND_GOODS']);
$worker->run(function($key, $msg){
$order_data = json_encode($msg);
echo " [$key] $order_data \n";
Task::create('demo'.'orderAsync', $msg);// Create a task, after which the message will be taken over by the task as an argument
});
}catch (ExceptionInterface $exc){
echo $exc->getMessage();
}Copy the code
Thus, an orderAsync task is generated when the message is received, and you just need to start a Worker to consume the Topic.
You might think you can just code the business logic here, and you can. You can do this when you can tolerate a process consuming slowly. But in most cases, we still hope it can be consumed as soon as possible, so it is suggested that we only create tasks here, and the business logic of specific tasks should be executed by the worker.
advertising
Github.com/luojilab/as… For the first time open source, we still have a long way to go.