1. The background

Message queues are considered by enterprise system developers for system decoupling, traffic peak-cutting, and functional asynchronization. Open-source messaging middleware such as Kafka, known for its high throughput, and RabbitMQ, widely used in financial and order systems for its absolute reliability, are popular with developers. Today, I come to talk about the message queue implemented by Redis. It is mainly applied in the internal system, and there is no special requirement for the reliability of messages in some business scenarios. It is characterized by its lightness, efficiency and simplicity.

2. Redis implements the data structure and command of message queue

Redis provides a variety of data structures and related commands to implement message queues, making it very easy to implement a simple message queue in system development. Through the list data structure, a simple message queue can be realized. Delayed message queue can be realized by zSET data structure. In order to realize message multicast mechanism, Redis also provides publish and subscribe commands to realize message subscription and publication. Redis5.0 added the Stream data structure, enabling a more specialized message queue. Let’s first introduce each of them.

2.1 listImplementing simple queues

The list data structure provided by Redis is itself a FIFO(first-in, first-out) queue. In business, message queue can be realized by producers adding messages at the end of the queue and consumers taking messages from the first queue, as described in the following figure:

Message enqueueing can be done by lpush or rpush to add the message body to the queue in O(1) time complexity. It can be done by starting a consumer to rpop/ Lpop the message out at intervals (e.g. 1s). The consumer PHP code looks like this:

<? php $redis = new Redis(); $redis - > connect (127.0.0.1, 6379); while($message = $redis->rpop('queue_list_test')) { var_dump($message); // After processing the logic, sleep for 1s.Copy the code

A sleep is necessary in the consumer code. If there is no sleep, then the while loop will continue to connect to the Redis server when queue_list_test is empty, directly raising the CPU of the client server and causing traffic to the Redis server to spike.

For this reason, Redis also provides brPOP/BLPOP commands, which are blocking versions of RPOP/LPOP. That is, when there is no data in the list, the commands will block and wait until there is a timeout or a message in the queue. The modified consumer PHP code is as follows:

<? php $redis = new Redis(); $redis - > connect (127.0.0.1, 6379); while($message = $redis->brpop('queue_list_test',3600)) { var_dump($message); }Copy the code

In the above example, fetching data from a list whose key name is Queue_list_test would block for 3600 seconds without data. However, the REDis extension of PHP is implemented based on PHP’s socket mode. If PHP itself sets a socket read timeout, it will exit with an error. The default socket timeout configuration item in php.ini is:

default_socket_timeout = 60
Copy the code

A good practice is to use PHP’s ini_set function to reset the socket timeout in the process.

<? php ini_set('default_socket_timeout', -1); $redis = new redis (); $redis - > connect (127.0.0.1, 6379); while($message = $redis->brpop('queue_list_test',0)) { var_dump($message); }Copy the code

Setting the brPOP timeout to 0 means that if no message arrives in the queue, the script waits indefinitely. This will solve the problem of delayed consumption of consumers.

2.2 zsetRealize delayed consumption queue

Zsets in Redis are similar to sets in that they are collections of strings and do not allow duplicate members to appear in a set. The difference between them is that each member of an ordered set has a score associated with it, and Redis ranks the members of the set from small to large by score. If you add a message to the zSET structure and set the timestamp of the message to be consumed to the corresponding score, does it form a consumption queue of time series? As shown below:

This is how a delayed message queue is implemented:

  • Producer passzaddCommand to add messages to the consumption queue of the time series,scoreTimestamp for the time the message needs to be consumed (a future point in time).
  • Consumer passzrangebyscoreCommand to retrieve messages that need to be consumed up to this point in time.

2.3 PubSubImplement message multicast

The simple queue using list and the delayed consumption queue using Zset do not support message multicast. If multiple different consumer groups want to consume the message in the queue, the logic of the consumer group can only be concatenated for continuous consumption:

Redis uses the PubSub module alone to support message multicast, also known as PublisherSubscriber(publisher/subscriber mode). Message multicast allows a producer to produce a message only once, with middleware responsible for copying the message to multiple message queues, each of which is consumed by a corresponding consumer group:

The relevant commands and usage of PubSub are not introduced here, readers can refer to the materials to learn, but it should be noted that the design of PubSub has a fatal shortcoming: it does not support message persistence

The PubSub producer sends a message, and Redis directly finds the corresponding consumer and delivers it. If there are no consumers, the message is discarded. For example, if there are three consumers at the beginning and one consumer suddenly hangs up, the producer will continue to send messages, and the other two consumers can continue to receive messages. However, when the lost consumer reconnects, the messages sent by the producer during the disconnection period are completely lost to the consumer. If Redis is down or restarted, there will be no consumers, all messages will be directly discarded, PubSub messages will not persist.

Because of these shortcomings of PubSub, there are few applications for it in the world of message queues!

The Redis authors also started a separate project, Disque, for message queuing, but it was never fully developed and was in Beta. In June 2018, Redis5.0 added the Stream data structure, which brought persistent message queues to Redis. Since then, PubSub as a message queue has disappeared, and Disque will never release a release.

2.4 Powerful persistent message queue that supports message multicaststream

One of the biggest features of Redis5.0 is the addition of a data structure called Stream for message queues, which takes a lot of cues from Kafka’s design, and now Redis has a truly professional message queue. The structure is shown in the figure below:

The stream structure itself is a time-unique array queue, similar to the time-series message queue made by Zset. Each message has a message ID in the form of a timestampinmillis-sequence, such as 1607226700982-5, which indicates that the current message was generated at millisecond timestamp 1607226700982 and is the fifth message within that millisecond. The message ID can be automatically generated by the server or specified by the client. The message ID must be an integer, and the ID of the message added later must be greater than the ID of the message before it.

Each Stream has a unique name, which is the key of Redis, created automatically when the command xadd is first used. The cursor last_deliverred_id moves forward on the Stream array queue to indicate which message has been consumed by the current Consumer Group. Each consumer group has a unique name within the stream. The consumer group needs to be created by xgroup_create and specified to start with a message ID that corresponds to the last_deliverred_id variable within the consumer group.

Each Stream message will be delivered to each Consumer group. Each Consumer group can mount multiple consumers. These consumers are in competition with each other. The cursor last_deliverred_id moves forward when any consumer reads the message, and each consumer has a unique name within the group.

The state variable pending_IDS inside the consumer is used to record which messages are currently being read by the client but have not yet been ack. The pending_IDS variable is used to ensure that a message is consumed at least once by the client and does not go unprocessed because it is lost during network transmission.

Stream design is not the focus of this section, but it is highly recommended that readers read the resources to learn about stream, it is the best solution for redis to do message queuing in the future!

3. Traditional Redis message queue design

Because ReDIS5.0 has just been launched, due to the historical reasons of the enterprise system internal REDIS queue is also mostly used in the traditional message queue design, that is, using list + ZSET structure encapsulation implementation, in the traditional REDIS queue design scheme, many ideas to solve the problem is worth learning, In this section we talk about the problems faced by traditional Redis message queues and how to solve them.

3.1 Integration of “delay” queue and “instant” queue

As we mentioned earlier, the traditional Redis message queue uses the list as an instant consumption queue, fetching consumption data via lPOP/rPOP or blPOP/brPOP commands. Zset is used as the delayed consumption queue, and the consumption data is taken out by zrangebyScore command. But for the consumer, it doesn’t matter which way the consumption data is pulled from the queue. A good practice is to encapsulate a method provided to consumers, let it just take out the data from the queue every time consumption, the usual way is when consumers each time through this method to get data from the queue, the system first from zset delay queue expired task migrated to list the queue immediately, Then the data is retrieved from the queue and returned to the consumer through LPOP/RPOP, as shown in the following diagram:

It should be noted that the system method we encapsulated to fetch messages from the queue and then migrate the expired messages from the delayed queue to the instant queue is not atomic like LPOP/RPOP. In order to ensure the concurrency safety of the system, we need to complete this work with lua script. Example code is as follows:

/** * Get the Lua script to migrate expired jobs back onto the queue. * * KEYS[1] - The queue we are removing jobs from,  for example: queues:foo:reserved * KEYS[2] - The queue we are moving jobs to, for example: queues:foo * ARGV[1] - The current UNIX timestamp * * @return string */ public static function migrateExpiredJobs() { return <<<'LUA' -- Get all of the jobs with an expired "score"... local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto the destination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return val LUA; }Copy the code

The above lua script packages the zrangeByScore, ZremrangeByrank, and rpush redis directives together to ensure atomicity of message migration so that our system can safely enable multiple consumers.

It is also necessary for producers to encapsulate a system method. Thanks to the atomic nature of zADD and LPUSH/Rpush instructions, we just need to determine whether the message needs to be delayed, and then call the instructions to push the delayed message to the delayed queue and the instant message to the instant queue:

3.2 Realization of “ACK Mechanism” and “Timeout Retry” for messages

Professional message queues, such as Kafka and RabbitMQ, have ACK message acknowledgement mechanisms. The ACK mechanism ensures that the message in the queue is consumed at least once. Queues usually provide manual ACK and automatic ACK. Consumers with high requirements on message reliability should use the manual ACK mechanism. Redis traditional message queue if you want to implement the ACK mechanism, the first thought is to use a set data structure, add the message taken out by the consumer to the set, when the consumer finished processing the message, then delete the message from the set to complete the ACK.

Previously, we used zSET for delay queue, which is also a set and provides a ranking attribute value score. If we use ZSET to complete ack of our queue system, score can also be used to complete another very useful function: timeout retry. As shown below:

The Redis messaging system, with its “timeout retry” and “ACK mechanism,” is much more complex than before. When consumers take tasks from the message system through the system method, the message system first migrates delayed queue, expired ACK queue and no ACK messages to the instant queue (in order to ensure the concurrent security of the system, the two message migration processes need to be completed with lua script). Before taking out the message and giving it to the consumer, the message system needs to add the message to the ACK queue and set the timeout retry time retrY_after_time. That is, if the message has not been ack after retrY_after_time, the message will be migrated to the instant queue for consumption again. The normal processing would be for the consumer to retrieve the message and then ack(delete the message from the ACK queue) by consuming the message data through the handler.

3.3 Consumers’ “Graceful reboot”

Although the ack can ensure the message queue can be spending at least once, but the business scenario for the processing of large task tend to be more time consuming, we don’t want to in the message is consumption by half, forced termination of consumers, but the desire for consumption in the current message is complete, restart again the consumer, we call it a graceful restart of consumers.

For those familiar with nginx, the nginx -s reload command enables nginx to reload the latest configuration file and restart child processes while providing external services. The main process listens for signals to be sent to the child process. The child process exits when it finishes processing the current service, and the master restarts the new child process. Therefore, the main process number does not change before and after restarting nginx using this command.

The same is true for queue consumers. Different programming languages provide different signal processing functions. PHP uses the PCNTL extension to implement signal processing.

class Worker { public $shouldQuit = false; public function daemon(string $queueName = '') { $this->listenForSignals(); while (! $this->supportsAsyncSignals() {if ($this->supportsAsyncSignals()); { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->shouldQuit = true; }); pcntl_signal(SIGUSR2, function () { $this->shouldQuit = true; }); pcntl_signal(SIGCONT, function () { $this->shouldQuit = true; }); }}}Copy the code

With the above code, when we kill the consumer process number with the kill command, the consumer program receives the signal, sets the next loop condition to false, and exits after processing the current task, implementing a “graceful restart”. However, in business development, Supervisor is often used to manage consumers. Supervisor can also interact with consumers through signals and recommend an article to readers. Those who are interested can learn from each other about the use of Supervisor in PHP projects

3.4 Handling failure Messages

Whether it’s an ACK mechanism to ensure that a message is consumed at least once, or a graceful restart by a consumer listening signal, there will always be consumption failures, which are inevitable in a business. In the case of message queues implemented by Redis, we can set up a failure handler for the consumer, and when the message fails, the consumer decides what to do with it (usually adding the error message to the database for subsequent manual troubleshooting). Here is the PHP code:

class Worker { public function daemon(string $queueName = '') { while (! $this->shouldQuit) {try {catch (\Exception $e) {if ($this->shouldQuit) {if ($this->shouldQuit) {if ($this->shouldQuit) {if ($this->shouldQuit) {if (\Exception $e) { $this->failedHandler($e, $jobInfo); die($e->getMessage()) . PHP_EOL; } catch (\Throwable $e) { die($e->getMessage()) . PHP_EOL; }}} public function failedHandler(\Exception $e, array $jobInfo) { if (class_exists($jobInfo['commandName'])) { $consumerInstance = new $jobInfo['commandName']($jobInfo['data']); if (method_exists($consumerInstance, $this->failedHandlerMethod) {$this->failedHandlerMethod; $consumerInstance->$failedMethod($e); }}}}Copy the code

4. Summary

Implementing a fully functional, robust and highly available REDis message queue is not that simple, and there are many factors to consider. Redis itself has come a long way from business needs to stream, which is now a relatively reliable message queue solution. Traditional Redis message queues using list + Zset implementations have been in service in the enterprise for many years, and there is some reason why they have not been completely replaced. This paper analyzes the problems faced by traditional message queue implementation, and what are the solutions to these problems. Although the traditional Redis message queue will be replaced by stream, by learning this section we understand the idea of Redis message queue implementation, comprehend some methodology of complex system implementation, this process is still very happy.