An overview of the
Swoole Asynchronous Task, which implements the execution of asynchronous tasks.
Common scenarios include asynchronous payment processing, asynchronous order processing, asynchronous log processing, and asynchronous email/SMS sending.
Swoole is implemented in such a way that worker processes data requests and assigns them to task processes for execution.
Official introduction:
Tasks communicate with Unix Socket pipes and are full memory with no IO consumption. The read/write performance of a single process can reach 1 million /s. Different processes use different channels to communicate, maximizing the use of multiple cores.
Local versions: PHP 7.2.6, Swoole 4.3.1.
Without further ado, let’s look at the renderings:
code
server.php
<? php
class Server
{
private $serv;
public function __construct() {
$this->serv = new swoole_server('0.0.0.0', 9501);
$this->serv->set([
'worker_num' => 2, // Start two worker processes
'max_request' => 4, // Set max_request to 4 times per worker process
'task_worker_num' => 4, // Start four task processes
'dispatch_mode' => 2, // Packet distribution policy - Fixed mode
]);
$this->serv->on('Start', [$this, 'onStart']);
$this->serv->on('Connect', [$this, 'onConnect']);
$this->serv->on("Receive", [$this, 'onReceive']);
$this->serv->on("Close", [$this, 'onClose']);
$this->serv->on("Task", [$this, 'onTask']);
$this->serv->on("Finish", [$this, 'onFinish']);
$this->serv->start();
}
public function onStart($serv) {
echo "#### onStart ####".PHP_EOL;
Echo "SWOOLE ".swoole_version. "Service started ".
echo "master_pid: {$serv->master_pid}".PHP_EOL;
echo "manager_pid: {$serv->manager_pid}".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onConnect($serv, $fd) {
echo "#### onConnect ####".PHP_EOL;
Echo "$fd.".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onReceive($serv, $fd, $from_id, $data) {
echo "#### onReceive ####".PHP_EOL;
echo "worker_pid: {$serv->worker_pid}".PHP_EOL;
Echo "{$data}".PHP_EOL;
$param = [
'fd' => $fd,
'email' => $data
];
$rs = $serv->task(json_encode($param));
if ($rs === false) {
Echo "Task assignment failed ".$rs.PHP_EOL;
} else {
Echo "Task assigned successfully ".$rs.PHP_EOL;
}
echo "########".PHP_EOL.PHP_EOL;
}
public function onTask($serv, $task_id, $from_id, $data) {
echo "#### onTask ####".PHP_EOL;
echo "#{$serv->worker_id} onTask: [PID={$serv->worker_pid}]: task_id={$task_id}".PHP_EOL;
// Business code
for($i = 1 ; $i <= 5 ; $i ++ ) {
sleep(2);
Echo "Task {$task_id} has completed {$I}/5 ".
}
$data_arr = json_decode($data, true);
$serv - > send ($data_arr [' fd], 'Email:'. $data_arr [' Email ']. ', send success);
$serv->finish($data);
echo "########".PHP_EOL.PHP_EOL;
}
public function onFinish($serv,$task_id, $data) {
echo "#### onFinish ####".PHP_EOL;
Echo "Task {$task_id} ".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onClose($serv, $fd) {
echo "Client Close.".PHP_EOL;
}
}
$server = new Server();
Copy the code
client.php
<? php
class Client
{
private $client;
public function __construct() {
$this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on('Connect', [$this, 'onConnect']);
$this->client->on('Receive', [$this, 'onReceive']);
$this->client->on('Close', [$this, 'onClose']);
$this->client->on('Error', [$this, 'onError']);
}
public function connect() {
if(! $fp = $this - > client - > connect (" 127.0.0.1 ", 9501, 1)) {
echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;
return;
}
}
public function onConnect($cli) {
Fwrite (STDOUT, "enter Email:");
swoole_event_add(STDIN, function() {
Fwrite (STDOUT, "enter Email:");
$msg = trim(fgets(STDIN));
$this->send($msg);
});
}
public function onReceive($cli, $data) {
echo PHP_EOL."Received: ".$data.PHP_EOL;
}
public function send($data) {
$this->client->send($data);
}
public function onClose($cli) {
echo "Client close connection".PHP_EOL;
}
public function onError() {
}
}
$client = new Client();
$client->connect();
Copy the code
summary
How many processes are enabled by the configuration above?
Total 8 processes (1 master process, 1 Manager process, 4 Task processes, and 2 worker processes)
The re-run may be different from the process number shown above:
Master process: 22481
Manager process: 22485
Task process: 22488, 22489, 22490, 22491
Worker process: 22492, 22493
Refer to the official progress chart:
Ii. Why does the worker process number change after executing it for 5 times?
Because we set max_request=4 for worker processes, a worker process will automatically exit after completing the task with the maximum number of requests, and the process exit will release all memory and resources. This mechanism is mainly to solve the problem of PHP process memory overflow.
If we kill a task, will another task be created?
Will be.
Task_worker_num = task_worker_num
The maximum value cannot exceed SWOOLE_CPU_NUM * 1000.
Check the number of CPU cores:
echo "swoole_cpu_num:".swoole_cpu_num().PHP_EOL;
Copy the code
According to the task quantity of the project, for example, 200 tasks will be generated in one second, and each task needs 500ms to execute.
To complete 200 tasks in 1s, you need 100 task processes.
100 = 200 / (1/0.5)
Worker_num = worker_num
The default value is the number of CPU cores on the host. The maximum value cannot exceed SWOOLE_CPU_NUM x 1000.
For example, 10 processes must be configured to provide 1000QPS processing capacity for a request that takes 10ms.
10 = 0.01 * 1000
Assuming that each process occupies 40 MB of memory, 10 processes need 400 MB of memory.
extension
-
Server->taskwait
-
Server->taskWaitMulti
-
Server->taskCo
Reference documentation
-
https://wiki.swoole.com/wiki/page/134.html
Recommended reading
-
Swoole Timer application
-
RPC in my eyes
-
Leadership: Situational Leadership
This article is welcome to forward, forward please indicate the author and source, thank you!