preface

During this period, the project encountered a new function proposed by the customer, which required users to push a wechat message to the user’s wechat in the first 5 minutes and the last 10 minutes when they participated in volunteer activities. In fact, there are many scenarios of such delayed operation, such as the common e-commerce system in 30 minutes order to pay off this function.

Then I conducted a survey of technical solutions and found that there are mainly the following solutions that can meet the business scenario at present:

  • Poll the database tables, build messages and store them in the database, and then start a scheduled task every minute to scan the database tables. The start time of the activity is 5 minutes after the current time, and the end time is 10 minutes after the current time. This approach is simple to implement, but puts a lot of strain on the database (least)
  • Based on DelayQueue in the JDKJava with a delay queue function, through the implementationDelaydInterface can implement custom delay logic, very simple. But message data is not persisted, and when something goes down, the messages don’t exist. However, consider persisting messages to the database and using delayed queues to delete messages from the database after successful consumption. If the application is down, the messages that meet the time requirements are put into the delay queue when the application is restarted. (a)
  • Based on the Key expiration notification of Redis, after the user subscribs to the message, the message is stored in Redis and the expiration time of the Key is set. The server starts a thread to listen for the callback of the Redis Key expiration event. This method is simple to implement and uses the features provided by Redis itself. However, the Value corresponding to the Key cannot be obtained in the callback after the Key expires. Therefore, a Key/Value pair of Key+ “special characters” ->Value mapping needs to be redundant in Redis, which leads to the existence of unnecessary Key/Value pairs. (a)
  • RabbitMQ uses a normal queue and a dead letter queue to implement a delay queue. A message is placed with an expiration date on a normal queue, which has no customers, and when it expires it will be transferred to a dead letter queue. Open the data in a consumer dead-letter queue; RabbitMQ does not support delay queues by default, but plugins that do provide delay queues can be integrated into RabbitMQ. (good)

I’m going to implement all four of the above (except for the first one), and then give you a taste of the advantages of the different implementations

Various implementations

DelayQueue based on the JDK

public class DelayedMessage implements Delayed {
    // Wechat user Id
    private final String openId;
    // Activity name
    private final String activityName;
    private final Long expireTime;

    public DelayedMessage(String openId, String activityName, long expireTime) {
        this.openId = openId;
        this.activityName = activityName;
        this.expireTime = expireTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime, TimeUnit.NANOSECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        DelayedMessage message = (DelayedMessage) o;
        return this.expireTime.compareTo(message.getExpireTime());
    }

    public String getOpenId(a) {
        return openId;
    }

    public String getActivityName(a) {
        return activityName;
    }

    public Long getExpireTime(a) {
        return expireTime;
    }

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedMessage> delayQueue = new DelayQueue<>();
        // Drop the message thread
        new Thread(() -> {
            DelayedMessage message1 = new DelayedMessage("Wechat User 1"."Snow Cleaning campaign",
                                                         System.currentTimeMillis() + 1000 * 10); / / expired after 10 s
            delayQueue.put(message1);
            System.out.println("Drop message :" + message1 + "Drop time :" + LocalDateTime.now());

        }).start();
        // Consume message threads
        new Thread(() -> {
            while(! Thread.currentThread().isInterrupted()) { DelayedMessage delayedMessage =null;
                try {
                    delayedMessage = delayQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Received information :" + delayedMessage + "Get time :" + LocalDateTime.now());

            }
        }).start();
        Thread.currentThread().join();
    }

    @Override
    public String toString(a) {
        return "DelayedMessage{" +
            "openId='" + openId + '\' ' +
            ", activityName='" + activityName + '\' ' +
            ", expireTime=" + expireTime +
            '} '; }}Copy the code

Running results:

Redis-based notification of Key expiration events

First, if you want to use Rdis notification of Key expiration events, you need to modify the Redis Config configuration file. IO /topics/noti…

Go to your Redis redis.conf file and open the following configuration:

notify-keyspace-events Ex # x stands for expiration event
Copy the code

The following uses the way of integrating Redis with SpringBoot to achieve this function

pom.xml

<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.7.9</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

</dependencies>
Copy the code

RedisConfig.java

@Configuration
public class RedisConfig{
    @Bean
    public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        returncontainer; }}Copy the code

RedisKeyExpireListener.java

package com.pkit.config;

import cn.hutool.core.lang.Console;
import cn.hutool.json.JSONUtil;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/ * * *@author zhuxy  [email protected]
 * @dateThe 2021-08-25 22:05 * /
@Component
public class RedisKeyExpireListener extends KeyExpirationEventMessageListener {


    private final StringRedisTemplate redisTemplate;


    public RedisKeyExpireListener(RedisMessageListenerContainer listenerContainer, StringRedisTemplate redisTemplate) {
        super(listenerContainer);
        this.redisTemplate = redisTemplate;
    }


    @PostConstruct
    public void startProducerThread(a){
        // Start a producer thread
        Thread producerThread = new Thread(() -> {
            MqMessage message = new MqMessage("123456"."Snow Cleaning campaign");
            String key = "ExpireData"+message.getOpenId();
            // The value of this key pair can be written freely, not necessarily message.getopenid ()
            redisTemplate.setEnableTransactionSupport(true);
            redisTemplate.multi();
            redisTemplate.opsForValue().set(key,message.getOpenId(),10, TimeUnit.SECONDS); // Set the expiration time to 10S
            Console.log("Drop message time :{}, the message is :{}",LocalDateTime.now(),message);
            redisTemplate.opsForValue().set("Cache_"+key, JSONUtil.toJsonStr(message));
            redisTemplate.exec();
        });
        producerThread.start();
    }

    @Override
    protected void doHandleMessage(Message message) {
        String key = message.toString();
        // Indicates that the business Key is expired
        if (key.contains("ExpireData")){
            String messageData = redisTemplate.opsForValue().get("Cache_" + key);
            Console.log("Consume data at time :{}, message :{}",LocalDateTime.now(),messageData); }}static class MqMessage{
        // Wechat user Id
        private String openId;
        // Activity name
        private String activityName;

        public MqMessage(String openId, String activityName) {
            this.openId = openId;
            this.activityName = activityName;
        }

        public String getOpenId(a) {
            return openId;
        }

        public void setOpenId(String openId) {
            this.openId = openId;
        }

        public String getActivityName(a) {
            return activityName;
        }

        public void setActivityName(String activityName) {
            this.activityName = activityName;
        }

        @Override
        public String toString(a) {
            return "MqMessage{" +
                "openId='" + openId + '\' ' +
                ", activityName='" + activityName + '\' ' +
                '} '; }}}Copy the code

Running results:

Rabbitmq-based delay queue

Here is one of our most important implementations, the same Spring Boot project as above, with RabbitMQ integrated.

First of all, we need to give the RabbitMQ delay queue plug-in installation, here I am not going to explain in detail how to install the plugin, installation is very simple, baidu once you know www.cnblogs.com/isunsine/p/…

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.6. RELEASE</version>
    <relativePath/>
</parent>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>. The 2.0.3 RELEASE</version>
</dependency>
Copy the code

** Note the spring-cloud-starter-stream-rabbit and Spring Boot versions above! 支那

Note that the abovespring-cloud-starter-stream-rabbitAnd Spring Boot corresponding version!

Note that the abovespring-cloud-starter-stream-rabbitAnd Spring Boot corresponding version!

application.yml

spring:
  redis:
    database: 1
    password: PUKKA028
    host: 192.168102.69.
    port: 6379
  rabbitmq:
    addresses: 192.168102.69.
    listener:
      direct:
        acknowledge-mode: manual
      type: direct
    port: 5672
    username: pukka
    password: PUKKA028
    virtual-host: /industry
  cloud:
    stream:
      bindings:
        WxMessageOutChannel:
          destination: wxMessageExchange Bind producer's Channel
        WxMessageInputChannel:
          destination: wxMessageExchange Bind the consumer's Channel
          group: wxMessageGroup
      rabbit:
        bindings:
          WxMessageOutChannel:
            producer:
              delayed-exchange: true Create a delay switch
Copy the code

When the project is successfully launched, you can see the Exchange shown below

MessageInputChannel. Java Consumer channel interface

package com.pkit.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

/ * * *@author zhuxy  [email protected]
 * @dateThe 2021-08-27 23:50:27 * /
public interface MessageInputChannel {
    String WX_MESSAGE_INPUT_CHANNEL = "WxMessageInputChannel";

    @Input(MessageInputChannel.WX_MESSAGE_INPUT_CHANNEL)
    MessageChannel wxMessageInputChannel(a);
}

Copy the code

MessageInputChannelHandler. Java consumer process

package com.pkit.config;

import cn.hutool.core.lang.Console;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/ * * *@author zhuxy  [email protected]
 * @dateThe 2021-08-27 23:52:06 * /
@Component
@EnableBinding(MessageInputChannel.class)
public class MessageInputChannelHandler {

    @StreamListener(MessageInputChannel.WX_MESSAGE_INPUT_CHANNEL)
    public void sendWxMessageToUser(Message<MqMessage> message){
        MqMessage mqMessage = message.getPayload();
        Console.log("Consume message time :{}, the message is :{}", LocalDateTime.now(),mqMessage); }}Copy the code

MessageOutPutChannel. Java production in channel interface

package com.pkit.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/ * * *@author zhuxy  [email protected]
 * @dateThe 2021-08-27 23:43:17 * /
public interface MessageOutPutChannel {
    String WX_MESSAGE_OUT_CHANNEL = "WxMessageOutChannel";

    @Output(MessageOutPutChannel.WX_MESSAGE_OUT_CHANNEL)
    MessageChannel wxMessageOutChannel(a);

}

Copy the code

Logic MessageOutPutChannelHandler. Java producers on the news

package com.pkit.config;

import cn.hutool.core.lang.Console;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;



/ * * *@author zhuxy  [email protected]
 * @dateThe 2021-08-27 23:45:30 * /
@Component
@EnableBinding(MessageOutPutChannel.class)
public class MessageOutPutChannelHandler {


    private final MessageChannel wxMessageOutChannel;

    public MessageOutPutChannelHandler(@Qualifier(MessageOutPutChannel.WX_MESSAGE_OUT_CHANNEL) MessageChannel wxMessageOutChannel) {
        this.wxMessageOutChannel = wxMessageOutChannel;
    }

    public void sendWxMessage(MqMessage message,long delayTime){
        Message<MqMessage> messageMessage = MessageBuilder.withPayload(message)
            .setHeader("x-delay", delayTime).build(); // Set the timeout for sending messages to the queue
        Console.log("Message delivery time :{}, the message is :{}", LocalDateTime.now(),message); wxMessageOutChannel.send(messageMessage); }}Copy the code

Test.java

package com.pkit;

import com.pkit.config.MessageOutPutChannelHandler;
import com.pkit.config.MqMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@author zhuxy  [email protected]
 * @dateThe 2021-08-25 * / calm
@RestController
public class Test {

    @Autowired
    private MessageOutPutChannelHandler outPutChannelHandler;

    @GetMapping("/test")
    public void test(a){
        MqMessage mqMessage = new MqMessage("RabbitMq-123456"."Sweep the snow out front.");
        outPutChannelHandler.sendWxMessage(mqMessage,1000*10); / / 10 s delay}}Copy the code

Running results:

conclusion

The above describes three kinds of delay notification scheme, in fact, through the implementation of the process you can also see the pros and cons of their points. If you need to ensure message reliability in a distributed environment, you are advised to use two methods based on Redis and RabbitMQ. These two methods provide message persistence, Reids (RDB and AOF), and RabbitMQ enables Exchange and Queue persistence. I believe that in distributed environments Redis will be available in your system, however RabbitMQ may not be used. To ensure high availability of the overall system without introducing unnecessary components, use Redis to implement delayed notification. I think it’s the best way.