The preface
When it comes to message queue middleware, we often think of RabbitMQ, RocketMQ and Kafka to enable asynchronous messaging for applications. These are professional message queue middleware with more features than we can understand.
These message-oriented middleware are complex to use, such as RabbitMQ, which creates an Exchange and a Queue before sending a message, then binds the Exchange to the Queue by some sort of rule, routing the message, and controlling the header message. It is only the producer, but also the consumer, who has to go through the above series of tedious steps before consuming the message.
So for those who don’t need to be 100% reliable and want to implement simple message queuing requirements, Redis can relieve us of the tedious steps of message queuing middleware.
Redis message queues are not professional message queues. They do not have many of the advanced features of message queues, nor do they have ACK guarantees. If you want the ultimate in message reliability, turn to professional MQ middleware.
Asynchronous message queue
Starting with the simplest asynchronous message queue, Redis’s list data structure is commonly used as an asynchronous message queue, with lRpush /lpush for column entry and RPOP/lPOP for column exit.
Problem one: Empty queues
For POP operations, when the message queue is empty, the client gets caught in an endless loop of POP, causing a lot of empty polling that wastes life, causing the client CPU to get high, and the QPS of Redis to get high.
The solution to the above problem is to use the blPOP/brPOP of the list structure, where the B prefix stands for blocking. Blocking reads go to sleep when there is no data in the queue and wake up as soon as data arrives. This is the perfect solution to the above problem.
Problem 2: The idle connection is down
The blocking read scenario seems perfect, but then another problem arises: idle connections. If the thread keeps blocking somewhere, the Redis client connection becomes idle. If the idle time is too long, the Redis server will actively disconnect to reduce idle resource usage. Blpop/BRPOP will throw an exception.
So, we need to be careful when we write the client (application) consumer, catching exceptions and retrying them.
Application 1: delay queue
There are generally three strategies in Redis distributed locking to handle lock failures:
-
Directly throw an exception, the front end to remind the user whether to continue operations;
-
Sleep for a while and try again.
-
Put the request on a delay queue and retry later.
In Redis, we can realize the delay queue by zSET (ordered list) data structure. We treat message serialization as a string as the value of zSE, and message expiration processing time (latency) as score. Then polling ZSET to obtain the expiration time for processing. Removing key from Zset through ZREM represents successful consumption, and then processing the task.
The core code is as follows:
/ / production \
public void delay(T msg) {\
TaskItem task = newTaskItem(); \ task.id = UUID.randomUUID().toString();// Assign a unique uuid\task.msg = msg; \ String s = JSON.toJSONString(task);// fastjson serialization \
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // Put it into the delay queue, try again after 5s} \/ / consumer \
public void loop(a) {\
while(! Thread.interrupted()) {\// zrangeByScore = 0; system.currentTimemills () = 0; Inf = -inf; +inf = -inf; -inf = -inf
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0.1); \if (values.isEmpty()) {\
try {\
Thread.sleep(500); // The rest will continue} \catch (InterruptedException e) {\
break; \} \continue; \ }\ String s = values.iterator().next();// Consume queue \
if (jedis.zrem(queueKey, s) > 0) { // If a message is consumed successfully, it will be consumed successfully. \
TaskItem task = JSON.parseObject(s, TaskType); // fastjson deserialization \
this.handleMsg(task.msg);\
}\
}\
}
Copy the code
The above code in multi-threading for the same task is the case of multiple threads, although it can be processed after ZREM to avoid a task is consumed multiple times. However, threads that acquire tasks but fail to consume them are wasting their time fetching a task. So consider optimizing this logic with Lua Scripting. Moving zrangeByScore and ZREM together to the server for atomic manipulation is a perfect solution.
Recommended book: Redis Deep Adventure: Core Principles and Application Practice by Qian Wenpin