An overview of the

Swoole Asynchronous Task, which implements the execution of asynchronous tasks.

Common scenarios include asynchronous payment processing, asynchronous order processing, asynchronous log processing, and asynchronous email/SMS sending.

Swoole is implemented in such a way that worker processes data requests and assigns them to task processes for execution.

Official introduction:

Tasks communicate with Unix Socket pipes and are full memory with no IO consumption. The read/write performance of a single process can reach 1 million /s. Different processes use different channels to communicate, maximizing the use of multiple cores.

Local versions: PHP 7.2.6, Swoole 4.3.1.

Without further ado, let’s look at the renderings:

code

server.php


     

    <? php

    class Server

    {

    private $serv;

    public function __construct() {

    $this->serv = new swoole_server('0.0.0.0', 9501);

    $this->serv->set([

    'worker_num' => 2, // Start two worker processes

    'max_request' => 4, // Set max_request to 4 times per worker process

    'task_worker_num' => 4, // Start four task processes

    'dispatch_mode' => 2, // Packet distribution policy - Fixed mode

    ]);

    $this->serv->on('Start', [$this, 'onStart']);

    $this->serv->on('Connect', [$this, 'onConnect']);

    $this->serv->on("Receive", [$this, 'onReceive']);

    $this->serv->on("Close", [$this, 'onClose']);

    $this->serv->on("Task", [$this, 'onTask']);

    $this->serv->on("Finish", [$this, 'onFinish']);

    $this->serv->start();

    }

    public function onStart($serv) {

    echo "#### onStart ####".PHP_EOL;

    Echo "SWOOLE ".swoole_version. "Service started ".

    echo "master_pid: {$serv->master_pid}".PHP_EOL;

    echo "manager_pid: {$serv->manager_pid}".PHP_EOL;

    echo "########".PHP_EOL.PHP_EOL;

    }

    public function onConnect($serv, $fd) {

    echo "#### onConnect ####".PHP_EOL;

    Echo "$fd.".PHP_EOL;

    echo "########".PHP_EOL.PHP_EOL;

    }

    public function onReceive($serv, $fd, $from_id, $data) {

    echo "#### onReceive ####".PHP_EOL;

    echo "worker_pid: {$serv->worker_pid}".PHP_EOL;

    Echo "{$data}".PHP_EOL;

    $param = [

    'fd' => $fd,

    'email' => $data

    ];

    $rs = $serv->task(json_encode($param));

    if ($rs === false) {

    Echo "Task assignment failed ".$rs.PHP_EOL;

    } else {

    Echo "Task assigned successfully ".$rs.PHP_EOL;

    }

    echo "########".PHP_EOL.PHP_EOL;

    }

    public function onTask($serv, $task_id, $from_id, $data) {

    echo "#### onTask ####".PHP_EOL;

    echo "#{$serv->worker_id} onTask: [PID={$serv->worker_pid}]: task_id={$task_id}".PHP_EOL;

    // Business code

    for($i = 1 ; $i <= 5 ; $i ++ ) {

    sleep(2);

    Echo "Task {$task_id} has completed {$I}/5 ".

    }

    $data_arr = json_decode($data, true);

    $serv - > send ($data_arr [' fd], 'Email:'. $data_arr [' Email ']. ', send success);

    $serv->finish($data);

    echo "########".PHP_EOL.PHP_EOL;

    }

    public function onFinish($serv,$task_id, $data) {

    echo "#### onFinish ####".PHP_EOL;

    Echo "Task {$task_id} ".PHP_EOL;

    echo "########".PHP_EOL.PHP_EOL;

    }

    public function onClose($serv, $fd) {

    echo "Client Close.".PHP_EOL;

    }

    }

    $server = new Server();

Copy the code

client.php


     

    <? php

    class Client

    {

    private $client;

    public function __construct() {

    $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);

    $this->client->on('Connect', [$this, 'onConnect']);

    $this->client->on('Receive', [$this, 'onReceive']);

    $this->client->on('Close', [$this, 'onClose']);

    $this->client->on('Error', [$this, 'onError']);

    }

    public function connect() {

    if(! $fp = $this - > client - > connect (" 127.0.0.1 ", 9501, 1)) {

    echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;

    return;

    }

    }

    public function onConnect($cli) {

    Fwrite (STDOUT, "enter Email:");

    swoole_event_add(STDIN, function() {

    Fwrite (STDOUT, "enter Email:");

    $msg = trim(fgets(STDIN));

    $this->send($msg);

    });

    }

    public function onReceive($cli, $data) {

    echo PHP_EOL."Received: ".$data.PHP_EOL;

    }

    public function send($data) {

    $this->client->send($data);

    }

    public function onClose($cli) {

    echo "Client close connection".PHP_EOL;

    }

    public function onError() {

    }

    }

    $client = new Client();

    $client->connect();

Copy the code

summary

How many processes are enabled by the configuration above?

Total 8 processes (1 master process, 1 Manager process, 4 Task processes, and 2 worker processes)

The re-run may be different from the process number shown above:

Master process: 22481

Manager process: 22485

Task process: 22488, 22489, 22490, 22491

Worker process: 22492, 22493

Refer to the official progress chart:

Ii. Why does the worker process number change after executing it for 5 times?

Because we set max_request=4 for worker processes, a worker process will automatically exit after completing the task with the maximum number of requests, and the process exit will release all memory and resources. This mechanism is mainly to solve the problem of PHP process memory overflow.

If we kill a task, will another task be created?

Will be.

Task_worker_num = task_worker_num

The maximum value cannot exceed SWOOLE_CPU_NUM * 1000.

Check the number of CPU cores:


     

    echo "swoole_cpu_num:".swoole_cpu_num().PHP_EOL;

Copy the code

According to the task quantity of the project, for example, 200 tasks will be generated in one second, and each task needs 500ms to execute.

To complete 200 tasks in 1s, you need 100 task processes.

100 = 200 / (1/0.5)

Worker_num = worker_num

The default value is the number of CPU cores on the host. The maximum value cannot exceed SWOOLE_CPU_NUM x 1000.

For example, 10 processes must be configured to provide 1000QPS processing capacity for a request that takes 10ms.

10 = 0.01 * 1000

Assuming that each process occupies 40 MB of memory, 10 processes need 400 MB of memory.

extension

  • Server->taskwait

  • Server->taskWaitMulti

  • Server->taskCo

Reference documentation

  • https://wiki.swoole.com/wiki/page/134.html

Recommended reading

  • Swoole Timer application

  • RPC in my eyes

  • Leadership: Situational Leadership

This article is welcome to forward, forward please indicate the author and source, thank you!