Before the order

Redis can also be used for simple message queues. We can use the list data structure to implement queues.

List several commands

lpush (left push)

Stored on the left side of the queue

rpush (right push)

It’s stored on the right side of the queue

lpop (left pop)

It’s pulled out of the left side of the queue

rpop (right pop)

From the right side of the queue

The above four commands can be used to enable the list to implement the queue or stack. The characteristics of the queue are fifin first out, and the characteristics of the stack are fifin last out.

So queues can be implemented using either Lpush + RPOP or Rpush + LPOP,

The stack implementation is either Lpush + LPOP or Rpush + RPOP.

Use commands to demonstrate the queue

Producer releases message

First, we use rpush to add five elements to a notify-queue: 1, 2, 3, 4, 5

Consumer consumption news

Since the producer is using Rpush, the consumer is using LPOP. As you can see in the figure below, we are consuming the notify-queue in order, from 1 to 5, until there are no messages in the queue and the popups are always empty

Blank round questions

When we use the lPOP consumption message above, we can see that after the consumption of the message, each time we go to pop, we get an empty message.

The above command is executed manually, but if the code is written to constantly pop data (pull data), it will result in empty polling (useless read).

Increased client CPU consumption, increased REDis QPS, and still useless operations, which may cause other clients to be slow to access Redis.

Solution A (Sleep)

Since empty polling will cause high resource consumption of both the client and Redis, we can make the client sleep for 1s when it receives empty data, and then pull data after 1s, so as to reduce consumption

Thread.sleep(1000)

This scheme also has defects, that is, the delay of message consumption increases. If there is only one consumer, the delay is 1s, that is, after empty polling, the consumer just goes to sleep, but at this time, the message comes, and the consumer can only be consumed after waking up for 1s.

If there are multiple consumers, since the sleep time of each consumer is diverged, the latency will be reduced, but is there a better way to achieve almost zero latency?

Solution B (blocking read)

In Redis, there are actually two commands to queue data, namely block read,

blpop (blocking left pop)

brpop (blocking right pop)

Blocking reads go to sleep when there is no data in the queue. Once a message arrives, the queue reacts and reads the data immediately. Therefore, the problem of message latency can be solved by replacing LPOP/RPOP with BLPOP/BRPOP.

Continue to join the team with 3 stats, 6, 7, 8

Blpop is used to read the queue. The last parameter is the wait time for blocking read. If there is no message after this time, it will return nil.

Idle connections that block reads are automatically disconnected

When a client blocks read data for a long time, the service considers the connection as idle and disconnects the connection to reduce resource usage. In this case, the client throws an exception.

So be aware that when the client uses blocking reads, exceptions are caught and handled accordingly, such as retries.

Java clients implement message queues

The idea is the same as above, except that the command line client redis-cli has become the Java language, and one or more threads do rpush publishing,

One or more threads consume blPOP with the code at: github.com/qiaomengnan…

The publisher

The subscriber

The implementation of delay queue

A delayed queue is when a message is sent for a certain amount of time and then consumed by the consumer, rather than being sent and immediately read by the consumer,

Zset can help us do this. First of all, Zset can sort by score, which can store a time stamp. Therefore, when we publish messages, we use the current time stamp plus the delayed time stamp.

Then when consumers get messages, they will intercept zSET data to get messages that have met the current time (that is, data whose Score is less than or equal to the current time stamp means that the message has reached the time limit; if the score is greater than the current time stamp, it means that the message can be consumed after a while).

Key commands zadd (publisher), ZrangebyScore (subscriber), zrem (subscriber deletes data after consuming it)

Command implementation

We used Zadd to add four data points that can be consumed after 1, 2, 3 seconds, and a kafka that can be consumed after 10 seconds.

So let’s say we’re at the third second, and we take the data in zset that is greater than or equal to 1 second and less than or equal to 3 seconds, because that’s the amount of data that we can consume, and you can see that we’ve taken the three pieces of data that meet the criteria,

If you can only consume one piece of data at a time, you can put a limit on it. See the picture below for the first piece of data you can consume: Redis

Also note that unlike the LPOP/and BLPOP of list, which automatically delete the data in the original queue when they pop up,

Although the data is retrieved, it will still be read by others unless zREM is used to delete it, because it will remain in zset.

However, zREM may have been deleted by others first (consumption), so the code also needs to judge whether the return value of ZREM is greater than 0 to determine whether the message is preempted successfully, and then correct consumption.

Code implementation

The publisher

The subscriber

Test delay effect

Full code address: github.com/qiaomengnan…

Optimized, implemented using Lua

In the above implementation of the delay queue, there is a problem, is using ZREM to determine whether to grab the data, it is very likely to not grab, so continue to read, may not grab several rounds, resources wasted, so you can use Lua script to optimize,

Make ZrangeByScore and ZREM an atomized operation, which avoids multi-threaded contention and wasted resources.

conclusion

Some professional Queue middleware, for example, RabbitMQ, need to create an Exchange switch before sending messages, then create a Queue, and then bind the Exchange and Queue. When sending messages, you need to specify a routing-key to match the Exchange and finally get to the Queue.

If scene is simple, you can use redis implement a queue, but need to pay attention to, redis no professional characteristics of the queue, no guarantee of an ack, that is the message is not reliable, consumption failed, there would be no, if you need to complete the reliability, or the need to adopt professional queue middleware ack mechanism as guarantee, etc.