series
- Redis Application – Distributed locks
- Redis application – Asynchronous message queue and delay queue
- Redis application – bitmap
- Redis applications – HyperLogLog
- Redis application – Bloom filter
- Redis Application – Limiting traffic
- Redis applications – Geo
Asynchronous message queue
When it comes to message queues, you must think of message middleware such as Kafka and Rabbitmq. These professional message middleware provide many features, of course, their deployment, use and maintenance are quite troublesome. If you are less demanding about message queues and want something lightweight, use Redis.
Redis implements message queuing through the list data structure. The following commands are used:
- Lpush and Rpush are enqueued
- Lpop and RPOP come out of the queue
- Blpop and BRPOP block out queues
Without further ado, here is the code:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379); // Send a message$redis->lPush($list.$value); // Consume messageswhile (true) {
try {
$msg = $redis->rPop($list);
if (!$msg) { sleep(1); } // business processing} catch (Exception$e) {
echo $e->getMessage(); }}Copy the code
A problem with the above code is that if the queue is empty for a long time, the pop will not loop constantly, which will cause the QPS of Redis to increase, affecting performance. So we use sleep to solve this problem, blocking for a while when there is no message. However, there is another problem with this, which is that sleep leads to an increased delay in processing messages. We can block the read queue with blPOP/BRPOP.
Blpop/BRPOP goes to sleep when there is no data in the queue and wakes up as soon as the data arrives. Message latency is almost zero. Replace lPOP/RPOP with BLPOP/BRPOP.
Another important point to note is that we need to use try/catch to catch exceptions. If blocking persists, the Redis server will automatically disconnect the empty link to reduce the usage of idle resources.
Delays in the queue
Do you encounter the following scenarios when doing e-commerce projects?
- If the user fails to pay for more than one hour after placing the order, the order shall be closed
- If the comment on the order is not evaluated in 7 days, the system needs to automatically generate a comment
This is where we need to use the delayed queue, which means we need to delay execution for a certain amount of time. Redis can be implemented through Zset. We can set the value of the ordered set as our message task, set the score of the value as the expiration time of the message, and then poll to obtain the expiration message in the ordered set for processing.
The implementation code is as follows:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->zAdd($delayQueue.$tts.$value);
while(true) {
try{
$msg = $redis->zRangeByScore($delayQueue, 0, time (), 0, 1);if($msg) {continue; } // Delete the message$ok = $redis->zrem($delayQueue.$msg);
if($ok){// Business processing}} catch(\Exception$e) {}}Copy the code
Another problem arises here, the same task can be picked up by multiple processes and then used by ZREM to fight for it. Those processes that do not get the task are wasted. Solution: Atomize ZrangebyScore and Zrem using Lua scripts so that there is no such waste when multiple processes compete for tasks.
This article is also published in the wechat public number [Alley Information], welcome to scan the code to pay attention to!