Introduction to the
RabbitMQ is a highly available messaging middleware and it is essential to learn and use RabbitMQ.
- Asynchronous messaging
- Supports various development languages such as Java, Python and PHP
- Pluggable authentication, authorization
- Rabbitmq-manager can be used for administration and monitoring.
The installation
Docker is directly used here, which is very convenient for installation
Docker pull RabbitMQ :3.8.3-management-alpine
Run docker run –name run-rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq
Port 15672 is the RabbitMQ Web management page, directly access:http://localhost:15672/, initial user password: guest
use
There are basically two scenarios when RabbitMQ is used as a producer and a consumer
- One/more producers, more shared consumers
- One/more producers, multiple independent consumers
Independent consumers do not share queues. Each consumer has its own queue and can define rules to pull data from the Exchange into its own queue
The various scenarios are implemented in code below
Basic concept
queue
Data queues, where data can be pushed to or consumed from a queue
Exchange switches
When data is pushed to the switch, the queue can be bound to the switch. Different types of switches support different binding rules
- Fanout has no rules, all data in Exchange
- Direct matches exactly, binding only data for the values specified in the RoutingKey
- Topic more flexible rules, routingkey routingkey must be a by
.
Separate words,*
The asterisk is used to indicate a word,#
(hash sign) is used to indicate any number of words (zero or more)
Encapsulate common operations for RabbitMQ
<? php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; Class RabbitMQ {private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } /** * @param $exchangeName * @param $type * @param $pasive * @param $durable * @param $autoDelete */ public function createExchange($exchangeName, $type, $pasive = false, $durable = false, $autoDelete = false) { $this->channel->exchange_declare($exchangeName, $type, $pasive, $durable, $autoDelete); } /** * @param $queueName * @param $pasive * @param $durable * @param $exlusive * @param $autoDelete */ public function createQueue($queueName, $pasive = false, $durable = false, $exlusive = false, $autoDelete = false, $nowait = false, $arguments = []) { $this->channel->queue_declare($queueName, $pasive, $durable, $exlusive, $autoDelete, $nowait, $arguments); } @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * Consume message * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName, $callback, $tag = '', $noLocal = false, $noAck = false, $exclusive = false, $noWait = false) { $this->channel->basic_consume($queueName, $tag, $noLocal, $noAck, $exclusive, $noWait, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); }}Copy the code
Multiple shared consumers
Multiple consumers can increase consumption speed and provide system throughput
Two, let’s go to the code producer code
<? php require_once '.. /.. /vendor/autoload.php'; use RabbitMQ\RabbitMQ; use PhpAmqpLib\Message\AMQPMessage; $rabbit = new RabbitMQ(); $queueName = 'test-single-queue'; $rabbit->createQueue($queueName,false,true,false,false); for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a test message.", $queueName, "",['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); } unset($rabbit); // Close the connectionCopy the code
Running producerphp Producer
You can see the queue information on the Manager Web page
Consumer code
<? php require_once '.. /.. /vendor/autoload.php'; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $queueName = 'test-single-queue'; $callback = function ($message){ var_dump("Received Message : " . $message->body); //print message sleep(2); $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //ack }; $rabbit->consumeMessage($queueName,$callback); unset($rabbit); // Close the connectionCopy the code
Run consumer twicephp Consumer.php
You can see that two consumers are not consuming messages twice or you can see through the Manager Web that messages from this queue are being consumed
Multiple independent consumers
RabbitMQ producers push messages to exchanges, enabling multiple independent consumers by binding multiple queues to exchanges
Define a topic type switch with the consumption rule: test.ex. Add a word
<? php require_once '.. /.. /vendor/autoload.php'; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $exchangeName = 'test-ex-topic'; $queueName = 'test-consumer-ex-topic'; $routingKey = 'test.ex.*'; $rabbit->createQueue($queueName, false, true); $rabbit->bindQueue($queueName, $exchangeName, $routingKey); Function ($message) {var_dump("Received message: ". $message->body); //print message sleep(2); $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); //ack }; $rabbit->consumeMessage($queueName, $callback); unset($rabbit); // Close the connectionCopy the code
Launch the Consumer PHP consumer.php
Define producers that push messages to two different Routingkeys
<? php require_once '.. /.. /vendor/autoload.php'; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $routingKey1 = 'test.ex.queue1'; $routingKey2 = 'test.ex.queue2'; $exchangeName = 'test-ex-topic'; $rabbit->createExchange($exchangeName, AMQPExchangeType::TOPIC, false, true, false); Routingkey = test-ex-queue1 for ($I = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a queue1 message.", $routingKey1, $exchangeName, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // Messages persist, restart RabbitMQ, messages will not be lost]); } // For ($I = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a queue2 message.", $routingKey2, $exchangeName, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // Messages persist, restart RabbitMQ, messages will not be lost]); } unset($rabbit); // Close the connectionCopy the code
Running producerphp Producer.php
, you can see that the consumer has 20,000 messages to consume, containing data from two Routingkeys
Delay queue
concept
Rabbitmq queues can be set to a TTL, set expired messages to dead, and then pushed to delay_queue. Delay_queue can be consumed to implement the delay queue function
scenario
Assume a scenario: Xiao Ming places an order on the takeout platform. If the order is not paid within 10 minutes, the system will automatically cancel the order and push the “order cancelled” message to the user.
Development idea: OrderId is pushed to the order queue order_queue when the order is placed, and the validity period of the next message is set to 10 minutes. When this message expires after 10 minutes, the message will be dead letter pushed to exchange. Bind exchange and queue, open one or more consumer consumption queues, and check whether the message order in queue has been paid, if not, push notification, cancel the order.
The flow chart,Failure of message consumption is not considered
The core code
Simple encapsulation of RabbitMQ
<? php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; Class RabbitMQ {private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * Consume message * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName,$callback) { $this->channel->basic_consume($queueName, '', false, false, false, false, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); }}Copy the code
Creating a delay queue
<? php namespace RabbitMQ; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * Class DelayQueue * @package RabbitMQ */ Class DelayQueue extends RabbitMQ {/** * @param $ttl * @param $delayExName * @param $delayQueueName * @param $queueName */ public function createQueue($ttl, $delayExName, $delayQueueName, $queueName) { $args = new AMQPTable([ 'x-dead-letter-exchange' => $delayExName, 'x-message-ttl' => $TTL, 'x-dead-letter-routing-key' => $queueName]); 'x-message-ttl' => $TTL, 'x-dead-letter-routing-key' => $queueName]); $this->channel->queue_declare($queueName, false, true, false, false, false, $args); / / bind dead-letter queue $this - > channel - > exchange_declare ($delayExName, AMQPExchangeType: : DIRECT, false, true, false); $this->channel->queue_declare($delayQueueName, false, true, false, false); $this->channel->queue_bind($delayQueueName, $delayExName, $queueName, false); }}Copy the code
Producer, the code is very simple, look at the effect after running, order message more and more
<? php require_once '.. /vendor/autoload.php'; $delay = new \RabbitMQ\DelayQueue(); $ttl = 1000 * 100; $delayExName = 'delay-order-exchange'; Exchange $delayQueueName = 'delay-order-queue'; Queue $queueName = 'ttl-order-queue'; $delay->createQueue($TTL, $delayExName, $delayQueueName, $queueName); $I = 0; $I = 0; $i < 100; $i++) { $data = [ 'order_id' => $i + 1, 'remark' => 'this is a order test' ]; $delay->sendMessage(json_encode($data), $queueName); sleep(1); }Copy the code
The consumer, looking at the queue after the consumption, will later observe that there is an expiration message pushed to the delay_order_queueConsumers also consume messages!
<? php require_once '.. /vendor/autoload.php'; $delay = new \RabbitMQ\DelayQueue(); $delayQueueName = 'delay-order-queue'; $callback = function ($msg) { echo $msg->body . PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); // Handle order timeout logic, push notifications to users, etc... sleep(10); }; $delay->consumeMessage($delayQueueName, $callback);Copy the code
code
The code is shown:Github.com/jiaoyang3/r…
🏆 nuggets technical essay | double festival special articles