The message queue

So let’s do a simple introduction.

MQ is primarily used to:

  • Decoupling application,
  • Asynchronous message
  • Flow peak cutting and valley filling

At present, ActiveMQ, RabbitMQ, ZeroMQ, Kafka, MetaMQ, RocketMQ and so on are widely used. The resources on the web explain each situation in detail, so I won’t go into too much detail here. This article only covers the process of implementing lightweight MQ using Redis.

Why implement lightweight MQ with Redis?

MQ is especially important in business implementations where decoupling and asynchronization are available almost everywhere, if not in large volumes of traffic. At the same time MQ is a heavy component, for example if we use RabbitMQ we have to set up a server for it, and for availability we have to set up a cluster for the server, and we need to find out if there is a problem in production. In the development of small to medium sized businesses, the entire implementation of the business may not be as heavy. Heavy component services multiply the workload. Fortunately, the List data structure provided by Redis is well suited for message queues. But how to achieve instant consumption? How to implement ack mechanism? These are the keys to implementation.

How to achieve instant consumption?

One method circulating on the web is to use the Redis list operation BLPOP or BRPOP, which is blocking the list. Let’s look at how blocking pop-ups are used:

BRPOP key [key ...]  timeoutCopy the code

The description of this command is:

1. When there are no elements in the given list to eject, the connection is blocked by the BRPOP command until the wait times out or the eject element is found. 2. When multiple key parameters are given, each list is checked according to the sequence of key parameters, and the tail element of the first non-empty list is displayed.

In addition, BRPOP behaves in the same way as BLPOP, except that the pop-up elements are placed differently.

In this regard, the blocking pop-up of a list has two characteristics:

If there are no tasks in the list, the connection will be blocked

At this point, the problem is obvious. How do I set the timeout? Can I make sure that one queue stays blocked until a message enters the queue? It’s obviously hard to do, because they’re not bound to each other. Fortunately, Redis also supports Pub/Sub (publish/subscribe). When message A joins the list, message B is published to the channel. At this time, the worker who has subscribed to the channel receives message B and knows that message A enters the list, it can cycle LPOP or RPOP to consume messages in the list. The process is as follows:

Worker can be a separate thread or an independent service, acting as Consumer and business handler. The following is an example.

Instant consumption example

The example scenario is: The worker wants to synchronize files and waits until a file is generated.

First, open a thread on behalf of worker to subscribe to channel:

@Service public class SubscribeService { @Resource private RedisService redisService; @Resource private SynListener synListener; @subscribe public void subscribe() {new Thread(new Runnable() {@override public void run() { Logcvt. info(" service subscribed channel: {}", channel); redisService.subscribe(synListener, channel); } }).start(); }}Copy the code

SynListener in the code is the declared subscriber and channel is the name of the subscribed channel. The specific subscription logic is as follows:

@Service public class SynListener extends JedisPubSub { @Resource private DispatchMessageHandler dispatchMessageHandler;  @Override public void onMessage(String channel, String message) { LogCvt.info("channel:{},receives message:{}",channel,message); Try {/ / processing business (synchronize file) dispatchMessageHandler. SynFile (); } catch (Exception e) { LogCvt.error(e.getMessage(),e); }}}Copy the code

When doing business, we consume messages in the list:

@Service public class DispatchMessageHandler { @Resource private RedisService redisService; @Resource private MessageHandler messageHandler; public void synFile(){ while(true){ try { String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key()); if (null == message){ break; } Thread.currentThread().setName(Tools.uuid()); Messagehandler. synfile(message); } catch (Exception e) { LogCvt.error(e.getMessage(),e); }}}}Copy the code

Thus we achieve the purpose of real-time consumption of messages.

How to implement ack mechanism?

Ack stands for Acknowledge.

RabbitMQ ack mechanism

  • Publisher notifies the Consumer of the message, and if the Consumer has finished processing, it sends an ACK message to the Broker that a message has been successfully processed and can be removed from the queue. If the Consumer does not send an ACK message back, the Broker considers the message processing failed and distributes the message and subsequent messages to other consumers for processing (redeliver flag set to true).
  • This confirmation mechanism is similar to the TCP/IP protocol for establishing a connection. Unlike TCP/IP, which requires three handshakes to establish a connection, RabbitMQ requires only one ACK.
  • Value note that RabbitMQ will redistribute the ACK message to other consumers if and only if it detects that the ACK message has not been sent and the Consumer’s connection has terminated, so there is no need to worry about redistributing the message if it takes too long to process.

So what should we do when we implement the ACK mechanism of message queue in Redis? Two points to note:

  1. After the work processing fails, the message is rolled back to the original Pending queue
  2. If the worker hangs, the message is also rolled back to the original pending queue

The first of the above can be done in business, which is to execute a rollback message after a failure.

Implementation scheme

(This solution mainly solves the case of worker hanging)

  1. Maintain two queues: pending queue and DOING queue.
  2. Worker is defined as a ThreadPool.
  3. After the pending queue is dequeued, the worker allocates a thread to process the message — giving the target message Append a current timestamp and the name of the current thread, and enqueuing doing.
  4. Start a scheduled task and scan the doing queue at regular intervals to check the time stamp of each element. If timeout occurs, the worker’s ThreadPoolExecutor checks whether the thread exists. If yes, cancel the current task and rollback the transaction. Finally, pop the task out of the DOING queue and push it back into the pending queue.
  5. In a thread of worker, if the service fails to be processed, it will actively roll back, remove the task from doing queue, and push it back into pending queue.

conclusion

Redis is very limited as a message queue. Because of its main features and uses, it can only implement lightweight message queues. At the end: There are no absolutely good technologies, only the most business-friendly ones. This is dedicated to all developers.