Install dependencies

Json {"require": {"php-amqplib/php-amqplib": ">=3.0"}} > composer.phar install

The model structure

RPC workflow:

  • When the client starts, an anonymous exclusive callback queue is created.
  • For RPC requests, the client sends a message with two properties:

    • REPLY_TO, set to the callback queue;
    • Correlation_id, which sets a unique value for each request.
  • The request is sent to the rpc_queue queue.
  • The server waits for requests on this queue. When the request occurs, the work is performed and the message with the result is sent back to the client via the queue specified by reply_to.
  • The client waits for a message on the callback queue. When a message appears, check the correlation_id property. If it matches the value in the request, the response is returned to the application.

The client

<? php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class FibonacciRpcClient { private $connection; private $channel; private $callback_queue; private $response; private $corr_id; Public function __construct() {$this->connection = new amqpstreamConnection ('localhost', 5672, 'guest'); 'guest' ); $this->channel = $this->connection->channel(); // create a queue, existing ones will not be created again, List ($this->callback_queue,,) = $this-> Channel-> queue_declare("", false, false, true, false); // Set the fourth parameter to false to turn off automatic message acknowledgement, To true open automated message to confirm the delivery immediately after marked for deletion $this - > channel - > basic_consume ($this - > callback_queue, ", false, true, false, false, array( $this, 'onResponse' ) ); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array( 'correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue ) ); // Send a message to the queue via the default switch (message content, default switch, routing key); $this->channel->basic_publish($msg, '', 'rpc_queue'); while (! $this->response) { $this->channel->wait(); } return intval($this->response); } } $fibonacci_rpc = new FibonacciRpcClient(); $fibonacci_rpc->call(30); $fibonacci_rpc->call(30); echo ' [.] Got ', $response, "\n";

The service side

<? php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new amqpstreamConnection ('localhost', 5672, 'guest', 'guest'); $connection = new amqpstreamConnection ('localhost', 5672, 'guest'); $connection->channel(); $connection->channel(); $channel->queue_declare('rpc_queue', false, false, false, false); Function fib($n) {if ($n == 0) {return 0; } if ($n == 1) { return 1; } return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requests\n"; $callback = function ($req) {$n = intval($req->body); echo ' [.] fib(', $n, ")\n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to') ); $req->ack(); $req->ack(); }; $channel->basic_qos(null, 1, null); $channel->basic_qos(null, 1, null); $channel->basic_qos(null, 1, null); // Set the fourth parameter to false to turn off automatic message acknowledgement, $channel->basic_consume('rpc_queue', '', false, false, false, false, false, false, $callback); while ($channel->is_open()) { $channel->wait(); } $channel->close(); $connection->close();

run

Open a terminal and run the server:

PHP rpc_server.php # => [x] waits for RPC requests

Open another terminal and run the server:

PHP rpc_server.php # => [x] waits for RPC requests

Open another terminal and run the client:

PHP rpc_client.php # => [x] request fib(30)

Expand functionality

  • If no server is running, how should the client react?
  • Should clients set some kind of timeout for RPC?
  • If the server fails and throws an exception, should it be forwarded to the client?
  • Prevent invalid incoming messages (for example, checking boundaries, types) before processing.