The implementation of coroutine scheduling in this paper refers to zanPHP implementation: zanphp.io/

Write yourself a simple swoole+ PHP coroutine framework: github.com/neuSnail/Pc… Interested can have a look, very immature welcome advice.

Pcs is my graduation design based on ZANPHP. Different from Zan, Zan wrote a ZAN extension instead of Swoole to realize eventloop, Eventchain, etc. PCS chooses to continue to use Swoole and use asynchronous swoole_task to realize asynchrony, which has lower complexity and is easier to understand than ZAN.

About what is coroutines and PHP based on the generator coroutines is how to realize here is not to do a detailed explanation, did not understand the classmates can reference laruence this article: www.laruence.com/2015/…

The following description can be seen in many articles:

“Coroutines can interrupt active resource allocation in the event of a block, and the scheduler chooses another coroutine to run. However, PHP does not support native coroutines, so it makes no sense for the code to be executed synchronously if it is not executed by an asynchronous process, as follows:

function foo(a)
{
    $db=new Db();
    $result=(yield $db->query());
    yield $result;
}
Copy the code

The above database query operation is blocking. When the scheduler schedules the coroutine to this point, it finds that the operation is blocking. What should the scheduler do? Select remaining coroutines to execute? When and by whom should the coroutine block be performed? So talking about non-blocking IO without asynchronous calls in PHP coroutines is rogue. And swoole asynchronous task provides an implementation of the asynchronous solution, about swoole_task can refer to the official document https://wiki.swoole.com/wiki/…

Realization of core functions

  • Form one coroutine at a time

Start by creating a swoole_server and setting up the callback

class HttpServer implements Server
{
    private $swooleHttpServer;

    public function __construct(\swoole_http_server $swooleHttpServer)
    {
        $this->swooleHttpServer = $swooleHttpServer;
    }

    public function start(a)
    {
        $this->swooleHttpServer->on('start'[$this.'onStart']);
        $this->swooleHttpServer->on('shutdown'[$this.'onShutdown']);

        $this->swooleHttpServer->on('workerStart'[$this.'onWorkerStart']);
        $this->swooleHttpServer->on('workerStop'[$this.'onWorkerStop']);
        $this->swooleHttpServer->on('workerError'[$this.'onWorkerError']);
        $this->swooleHttpServer->on('task'[$this.'onTask']);
        $this->swooleHttpServer->on('finish'[$this.'onFinish']);


        $this->swooleHttpServer->on('request'[$this.'onRequest']);

        $this->swooleHttpServer->start();
    }
Copy the code

OnRequest method:

 public function onRequest(\swoole_http_request $request, \swoole_http_response $response)
    {
        $requestHandler = new RequestHandler($request, $response);
        $requestHandler->handle();
    }
Copy the code

The handle method is executed in the ReqeustHandler to resolve the route of the request, create a controller, and invoke the corresponding method. The implementation is not described here.

 public function handle(a)
    {
        $this->context = new Context($this->request, $this->response, $this->getFd());
        $this->router = new Router($this->request);

        try {
            if (false= = =$this->router->parse()) {
                $this->response->output(' ');
                return;
            }
            $coroutine = $this->doRun();
            $task = new Task($coroutine, $this->context);
            $task->run();
        } catch (\Exception $e) {
            PcsExceptionHandler::handle($e, $this->response); }}private function doRun(a)
    {
        $ret = (yield $this->dispatch());
        yield $this->response->send($ret);
    }
Copy the code

In the code aboveRet isThe result of the action() call,yield $this->response->send($ret);Is a response to a request from the client.

The $coroutine is a Genetator object formed by the request, which contains the flow of the request. The coroutine is then scheduled to get the actual execution result.

  • Coroutines scheduling
namespace Pcs\Coroutine;

use Pcs\Network\Context\Context;

class Task
{
    private $coroutine;
    private $context;
    private $status;
    private $scheduler;
    private $sendValue;

    public function __construct(\Generator $coroutine, Context $context)
    {
        $this->coroutine = $coroutine;
        $this->context = $context;
        $this->scheduler = new Scheduler($this);

    }

    public function run(a)
    {
        while (true) {
            try {
                $this->status = $this->scheduler->schedule();
                switch ($this->status) {
                    case TaskStatus::TASK_WAIT:
                        echo "task status: TASK_WAIT\n";
                        return null;

                    case TaskStatus::TASK_DONE:
                        echo "task status: TASK_DONE\n";
                        return null;

                    case TaskStatus::TASK_CONTINUE;
                        echo "task status: TASK_CONTINUE\n";
                        break; }}catch (\Exception $e) {
                $this->scheduler->throwException($e); }}}public function setCoroutine($coroutine)
    {
        $this->coroutine = $coroutine;
    }

    public function getCoroutine(a)
    {
        return $this->coroutine;
    }

    public function valid(a)
    {
        if ($this->coroutine->valid()) {
            return true;
        } else {
            return false; }}public function send($value)
    {
        $this->sendValue = $value;
        $ret = $this->coroutine->send($value);
        return $ret;
    }

    public function getSendVal(a)
    {
        return $this->sendValue; }}Copy the code

Task relies on the Generator object $coroutine. The Task class defines some get/set methods and some Generator methods. The Task::run() method is used to Schedule the coroutine. Each schedule returns the status of the current schedule. Laruence more than most of the articles, and online information are collaborators ride sharing a scheduler, which will run method for each collaborators process to create a scheduler, the reason is that each collaborators cheng is a client request, using a separate scheduler can reduce the influence of each other, and many collaborators cheng is the scheduling sequence between swoole to deal with, Don’t worry about the scheduler here. The code for scheduling is given below:

namespace Pcs\Coroutine;

class Scheduler
{
    private $task;
    private $stack;
    const SCHEDULE_CONTINUE = 10;

    public function __construct(Task $task)
    {
        $this->task = $task;
        $this->stack = new \SplStack();
    }
    
    public function schedule(a)
    {
        $coroutine = $this->task->getCoroutine();
        $value = $coroutine->current();

        $status = $this->handleSystemCall($value);
        if($status ! = =self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handleStackPush($value);
        if($status ! = =self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handleAsyncJob($value);
        if($status ! = =self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handelYieldValue($value);
        if($status ! = =self::SCHEDULE_CONTINUE) return $status;

        $status = $this->handelStackPop();
        if($status ! = =self::SCHEDULE_CONTINUE) return $status;


        return TaskStatus::TASK_DONE;
    }

    public function isStackEmpty(a)
    {
        return $this->stack->isEmpty();
    }

    private function handleSystemCall($value)
    {
        if(! $valueinstanceof SystemCall) {
            return self::SCHEDULE_CONTINUE; }}private function handleStackPush($value)
    {
        if(! $valueinstanceof \Generator) {
            return self::SCHEDULE_CONTINUE;
        }

        $coroutine = $this->task->getCoroutine();
        $this->stack->push($coroutine);
        $this->task->setCoroutine($value);

        return TaskStatus::TASK_CONTINUE;
    }

    private function handleAsyncJob($value)
    {
        if(! is_subclass_of($value, Async::class)) {return self::SCHEDULE_CONTINUE;
        }

        $value->execute([$this.'asyncCallback']);

        return TaskStatus::TASK_WAIT;
    }

    public function asyncCallback($response, $exception = null)
    {
        if($exception ! = =null
            && $exception instanceof \Exception
        ) {
            $this->throwException($exception, true);
        } else {
            $this->task->send($response);
            $this->task->run(); }}private function handelYieldValue($value)
    {
        if (!$this->task->valid()) {
            return self::SCHEDULE_CONTINUE;
        }

        $ret = $this->task->send($value);
        return TaskStatus::TASK_CONTINUE;
    }


    private function handelStackPop(a)
    {
        if ($this->isStackEmpty()) {
            return self::SCHEDULE_CONTINUE;
        }

        $coroutine = $this->stack->pop();
        $this->task->setCoroutine($coroutine);

        $value = $this->task->getSendVal();
        $this->task->send($value);

        return TaskStatus::TASK_CONTINUE;
    }

    public function throwException($e, $isFirstCall = false)
    {
        if ($this->isStackEmpty()) {
            $this->task->getCoroutine()->throw($e);
            return;
        }

        try {
            if ($isFirstCall) {
                $coroutine = $this->task->getCoroutine();
            } else {
                $coroutine = $this->stack->pop();
            }

            $this->task->setCoroutine($coroutine);
            $coroutine->throw($e);

            $this->task->run();
        } catch (\Exception $e) {
            $this->throwException($e); }}}Copy the code

The Schedule method in the Scheduler gets the coroutine of the current Task and the return value of the current breakpoint through the current() method, which then calls five methods in turn to process the return value. 1:handleSystemCall If the value returned is an object of type SystemCall, then system calls, such as killTask, are executed, with SystemCall being the first priority. 2: The handleStackPush calls the B function in the A function, and the B function is called A subroutine (subfunction) of the A function, whereas in the coroutine it cannot be called like A normal function.

function funcA(a)
{
    return funcB();
}

function genA(a)
{
    yield genB();
}
Copy the code

In funcA funcB (); Returns the execution result of funcB, but in genA, yield genB(); A Generator object is returned instead of the final execution result of genB. To obtain the execution results of genB, it is necessary to schedule genB, and there may be coroutine nesting of genC() and genD() in genB. Therefore, in order to make the coroutine function call normally at a glance, coroutine stack is used to achieve this.

3:handleAsyncJob

private function handleAsyncJob($value)
    {
        if(! is_subclass_of($value, Async::class)) {
            return self::SCHEDULE_CONTINUE;
        }

        $value->execute([$this.'asyncCallback']);

        return TaskStatus::TASK_WAIT;
    }

    public function asyncCallback($response.$exception = null)
    {
        if ($exception! == null &&$exception instanceof \Exception
        ) {
            $this->throwException($exception.true);
        } else {
            $this->task->send($response);
            $this->task->run(); }}Copy the code

The execute method of Async is executed when the return value of coroutine scheduling is a subclass of Async or an instance implementing Asycn interface. Here use mysqli database query class example.

    public function execute(callable $callback)
    {
        $this->callback = $callback;
        $serv = ServerHolder::getServer();
        $serv->task($this->sql, -1, [$this.'queryReady']);

    }

    public function queryReady(\swoole_http_server $serv.$task_id.$data)
    {
        $queryResult = unserialize($data);
        $exception = null;
        if ($queryResult->errno ! = 0) {$exception = new \Exception($queryResult->error);
        }
        call_user_func_array($this->callback, [$queryResult.$exception]);
    }

Copy the code

The execute method receives a function as a callback after the asynchronous operation is complete. In the execute method of Mysqli, an asynchronous swoole_task is started, the SQL operation is handed to the swoole_task for asynchronous execution, and the queryReady method is executed after the execution. $this->callback(); $this->callback(); $this->callback(); HandleAsyncJob does not wait for the result of an asynchronous operation, but returns a TASK_WAIT signal, which causes the run() method to return null, releasing the current worker. (SegmentFault doesn’t support zooming or HTML syntax is really uncomfortable)

4:handleYieldValue

private function handelYieldValue($value)
    {
        if (!$this->task->valid()) {
            return self::SCHEDULE_CONTINUE;
        }

        $ret = $this->task->send($value);
        return TaskStatus::TASK_CONTINUE;
    }

Copy the code

If the yield return is neither an asynchronous call nor a Generator, determine whether the current Generator is valid. If so, proceed with the following handleStackPush method, Otherwise, return Task_Continue, that is, yield multiple times in a generator, and only the last yield will be returned. 5:handleStackPush when judged in the previous step! $this->task->valid(); $this->task->valid();

What are the advantages of coroutines

When a request encounters AN IO, the synchronization operation will cause the current request to be blocked at the IO and wait for the IO to return. In Swoole, a request occupies a worker all the time.

However, when coroutine scheduling is used, the user can use yield to manually interrupt the blocking place, hand over to swoole_task for asynchronous operation, and release worker occupation to process other requests. Scheduling resumes when asynchronous processing is complete.

Note that PHP's coroutines only handle interrupts; asynchronous operations are done by Swoole_task