Swoole_process has two communication modes between swoole processes, one is message queue and the other is pipe. The research on swoole_process is particularly important in Swoole.

Preliminary knowledge

IO multiplexing

IO multiplexing in Swoole is represented as the underlying Epoll process model and in C language as the epoll function.

The epoll function responds only when an event that the socket is listening on is triggered. The epoll function returns a set of sockets that are listening on at that time. Its advantage is that it can process a large number of socket connection Event loop events

Swoole implements a Reactor thread model wrapper around epoll and sets up listening callbacks for read and write events. (See swoole_event_add for details)

The Event Loop is a Reactor thread that runs an epoll instance. An event of the socket descriptor is added to the epoll listener via swoole_event_add. The callback function will be executed when the event occurs. It is not available in the FPM environment because FPM may shut down the process at the end of the task.

swoole_process

Process management module based on C language encapsulation, convenient PHP to call the built-in pipeline, message queue interface, convenient implementation of inter-process communication we found in php-fpm configuration file, there are two kinds of process pool management Settings.

Static mode initializes a fixed number of processes and, when a request comes in, selects one of them to process. The dynamic mode specifies the minimum and maximum number of processes. If the number of processes exceeds the maximum number of requests, new threads are added to process the requests

The swoole code is then used to implement it. This is just for understanding the use of swoole_process, interprocess communication, timer, etc. In practice, it is more convenient to use the encapsulated swoole_server to implement the task queue pool.

Suppose there is a timed delivery task queue:


      

/** * Dynamic process pool, similar to FPM * dynamic new process * has the initial number of processes, the minimum number of processes, when the process is insufficient to handle the process, the maximum number of processes */

// a process is scheduled to deliver tasks

/** * 1. Tick * 2. Process and pipeline communication * 3
class processPool
{
  private $pool;

  / * * *@varSwoole_process [] records the process object of all workers */
  private $workers = [];

  / * * *@varArray Records worker status */
  private $used_workers = [];

  / * * *@varInt Minimum number of processes */
  private $min_woker_num = 5;

  / * * *@varInt Number of initial processes */
  private $start_worker_num = 10;

  / * * *@varInt Maximum number of processes */
  private $max_woker_num = 20;

  /** * The number of seconds the process is idle *@var int
   */
  private $idle_seconds = 5;

  / * * *@varInt Number of processes */
  private $curr_num;

  /** * Idle process timestamp *@var array
   */
  private $active_time = [];

  public function __construct()
  {
    $this->pool = new swoole_process(function () {
      // Loop to build the worker process
      for ($i = 0; $i < $this->start_worker_num; $i{+ +)$this->createWorker();
      }
      echo 'Number of initialization processes:' . $this->curr_num . PHP_EOL;
      // Post tasks to idle workers' pipes at regular intervals every second
      swoole_timer_tick(1000.function ($timer_id) {
        static $count = 0;
        $count+ +;$need_create = true;
        foreach ($this->used_workers as $pid= >$used) {
          if ($used= =0) {
            $need_create = false;
            $this->workers[$pid]->write($count . ' job');
            // The tag is in use
            $this->used_workers[$pid] = 1;
            $this->active_time[$pid] = time();
            break; }}foreach ($this->used_workers as $pid= >$used)
          // If no worker queue is idle, create a new worker to process it
          if ($need_create && $this->curr_num < $this->max_woker_num) {
            $new_pid = $this->createWorker();
            $this->workers[$new_pid]->write($count . ' job');
            $this->used_workers[$new_pid] = 1;
            $this->active_time[$new_pid] = time();
          }

        // The process is destroyed when idle for more than a period of time
        foreach ($this->active_time as $pid= >$timestamp) {
          if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
            // Destroy the process
            if (isset($this->workers[$pid&&])$this->workers[$pid] instanceof swoole_process) {
              $this->workers[$pid]->write('exit');
              unset($this->workers[$pid]);
              $this->curr_num = count($this->workers);
              unset($this->used_workers[$pid]);
              unset($this->active_time[$pid]);
              echo "{$pid} destroyed\n";
              break; }}}echo "The task{$count}/{$this->curr_num}\n";

        if ($count= =20) {
          foreach ($this->workers as $pid= >$worker) {
            $worker->write('exit');
          }
          // Turn off the timer
          swoole_timer_clear($timer_id);
          Exit the process pool
          $this->pool->exit(0);
          exit();
        }
      });

    });

    $master_pid = $this->pool->start();
    echo "Master $master_pid start\n";

    while ($ret = swoole_process::wait()) {
      $pid = $ret['pid'];
      echo "process {$pid} existed\n"; }}/** * Create a new process@returnInt Pid */ of the new process
  public function createWorker()
  {
    $worker_process = new swoole_process(function (swoole_process $worker) {
      // Bind events to the child process pipeline
      swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
        $data = trim($worker->read());
        if ($data= ='exit') {
          $worker->exit(0);
          exit(a); }echo "{$worker->pid}Are dealing with{$data}\n";
        sleep(5);
        // Return the result, indicating idle
        $worker->write("complete");
      });
    });

    $worker_pid = $worker_process->start();

    // Bind events to the parent process pipe
    swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
      $data = trim($worker_process->read());
      if ($data= ='complete') {
        // Mark it as idle
Echo "{$worker_process->pid} ";
        $this->used_workers[$worker_process->pid] = 0; }});// Save the process object
    $this->workers[$worker_pid] = $worker_process;
    // Mark it as idle
    $this->used_workers[$worker_pid] = 0;
    $this->active_time[$worker_pid] = time();
    $this->curr_num = count($this->workers);
    return $worker_pid; }}new processPool();
Copy the code

I hope the above content can help you. Many PHPer will encounter some problems and bottlenecks when they are advanced, and they have no sense of direction when writing too many business codes. I have sorted out some information, including but not limited to: Distributed architecture, high scalability, high performance, high concurrency, server performance tuning, TP6, Laravel, Redis, Swoft, Kafka, Mysql optimization, shell scripting, Docker, microservices, Nginx, etc. Many knowledge points can be free to share with you