Delayed message

Delayed message means that after the message is sent, consumers do not want to get the message immediately, but wait for a specified time before consumers get the message for consumption.

Typical scenarios for using delayed messages, such as:

  • In e-commerce systems, if a customer does not pay within 30 minutes of placing an order, the order may be cancelled.
  • In the e-commerce system, if a user does not comment on a product within seven days, the product will be praised by default.

Solutions for these scenarios include:

  • Poll through database records
  • The JDK DelayQueue
  • ScheduledExecutorService
  • Timed tasks based on Quartz
  • Zset based on Redis delay queue.

In addition, message queues can be used to implement delayed messages, such as RocketMQ.

2. RocketMQ

RocketMQ is a distributed messaging and streaming data platform with low latency, high performance, high reliability, trillion-scale capacity, and flexible scalability. RocketMQ is the third generation distributed messaging middleware opened source by Alibaba in 2012.

RocketMQ implements delayed messaging

3.1 Service Background

When our system completes an operation, it pushes an event message to the business side’s interface. When the return value of the notification interface of the business side is success, the push message is successful. When the return value is failure, the message is pushed multiple times until success is returned (at least one success is guaranteed).

When we failed to push, we would push the message multiple times, but not immediately. There will be a delay and push messages according to certain rules.

For example: try to push after 1 hour, try to push after 3 hours, try to push after 1 day, try to push after 3 days, etc. Therefore, consider using delayed messages to implement this feature.

3.2 Producers

The producer is responsible for generating the messages, and the producer sends the messages generated by the business application system to the message server.

First, define an AbstractProducer that supports delayed sending.

abstract class AbstractProducer :ProducerBean() {
    var producerId: String? = null
    var topic: String? = null
    vartag: String? =null
    var timeoutMillis: Int? = null
    var delaySendTimeMills: Long? = null

    val log = LogFactory.getLog(this.javaClass)

    open fun sendMessage(messageBody: Any, tag: String) {
        val msgBody = JSON.toJSONString(messageBody)
        val message = Message(topic, tag, msgBody.toByteArray())

        if(delaySendTimeMills ! =null) {
            val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
            message.startDeliverTime = startDeliverTime
            log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")}val logMessageId = buildLogMessageId(message)
        try {
            val sendResult = send(message)
            log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
        } catch (e: Exception) {
            log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
        }

    }

    fun buildLogMessageId(message: Message): String {
        return "topic: " + message.topic + "\n" +
                "producer: " + producerId + "\n" +
                "tag: " + message.tag + "\n" +
                "key: " + message.key + "\n"}}Copy the code

According to service requirements, a Producer supporting retry mechanism is added

@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {

    lateinit var delaySecondList:List<Long>

    fun sendMessage(messageBody: CleanReportPushEventMessage){
        // The event will not be emitted after the number of retries exceeds
        if(delaySecondList! =null) {

            if(messageBody.times>=delaySecondList.size){
                return
            }
            val msgBody = JSON.toJSONString(messageBody)
            val message = Message(topic, tag, msgBody.toByteArray())
            val delayTimeMills = delaySecondList[messageBody.times]*1000L
            message.startDeliverTime =  System.currentTimeMillis() + delayTimeMills
            log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
            val logMessageId = buildLogMessageId(message)
            try {
                val sendResult = send(message)
                log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
            } catch (e: Exception) {
                log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
            }
        }
    }
}
Copy the code

More than the number of retry in CleanReportPushEventProducer wouldn’t send the message again.

Each delayed message also takes a different time, so the delayTimeMills need to be retrieved based on the number of retries.

To set the startDeliverTime of message, run system.currentTimemillis () + delayTimeMills. Then call Send (message) to send a delayed message.

We use the commercial version of RocketMQ, so we support delayed messages with accuracy of seconds. In the open source version, RocketMQ supports only 18 specific levels of delayed messages. : (

3.3 Consumer

The consumer is responsible for consuming the message, and the consumer pulls the information from the message server and enters it into the user application.

AbstractConsumer:

@Data
abstract class AbstractConsumer ():MessageListener{

    var consumerId: String? = null

    lateinit var subscribeOptions: List<SubscribeOptions>

    var threadNums: Int? = null

    val log = LogFactory.getLog(this.javaClass)

    override  fun consume(message: Message, context: ConsumeContext): Action {
        val logMessageId = buildLogMessageId(message)
        val body = String(message.body)
        try {
            log.info(logMessageId + " body: " + body)
            val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
            log.info(logMessageId + " result: " + result.name)
            return result
        } catch (e: Exception) {
            if (message.reconsumeTimes >= 3) {
                log.error(logMessageId + " error: " + e.message, e)
            }
            return Action.ReconsumeLater
        }

    }

    abstract fun getMessageBodyType(tag: String): Type?

    abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action

    protected fun buildLogMessageId(message: Message): String {
        return "topic: " + message.topic + "\n" +
                "consumer: " + consumerId + "\n" +
                "tag: " + message.tag + "\n" +
                "key: " + message.key + "\n" +
                "MsgId:" + message.msgID + "\n" +
                "BornTimestamp" + message.bornTimestamp + "\n" +
                "StartDeliverTime:" + message.startDeliverTime + "\n" +
                "ReconsumeTimes:" + message.reconsumeTimes + "\n"}}Copy the code

The specific consumer is redefined and the message can be sent again after the consumer fails.

@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {

    val logger: Logger = LoggerFactory.getLogger(this.javaClass)

    override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
        if(obj is  CleanReportPushEventMessage){
            // Clear the event
            logger.info("consumer clean-report event report_id:${obj.id} ")

            // Send the message again after the consumption fails
            if(! cleanReportService.sendCleanReportEvent(obj.id)){val times = obj.times+1
                eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
            }
        }
        return Action.CommitMessage
    }

    override fun getMessageBodyType(tag: String): Type? {
        return CleanReportPushEventMessage::class.java}}Copy the code

SendCleanReportEvent () of cleanReportService invokes the interface provided by the service side through HTTP to push event messages. If the push fails, the next push will be made. (The sendMessage() method of eventProducer is used to repost the message because the message is successfully sent based on the content returned by the calling HTTP interface.)

Finally, define ConsumerFactory

@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {

    val logger: Logger = LoggerFactory.getLogger(this.javaClass)


    @PostConstruct
    fun start(a) {
        CompletableFuture.runAsync{
            consumers.stream().forEach {
                valproperties = buildProperties(it.consumerId!! , it.threadNums)val consumer = ONSFactory.createConsumer(properties)
                if(it.subscribeOptions ! =null&&! it.subscribeOptions!! .isEmpty()) {for (options in it.subscribeOptions!!) {
                        consumer.subscribe(options.topic, options.tag, it)
                    }
                    consumer.start()
                    val message = "\n".plus( it.subscribeOptions!! .stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
                                    .collect(Collectors.toList<Any>()))
                    logger.info(String.format("consumer: %s\n", message))
                }
            }
        }
    }

    private fun buildProperties(consumerId: String,threadNums: Int?).: Properties {
        val properties = Properties()
        properties.put(PropertyKeyConst.ConsumerId, consumerId)
        properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
        properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
        if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
            properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
        } else {
            // Test environment access RocketMQ
            properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
        }
        properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
        return properties
    }
}
Copy the code

4. To summarize

As mentioned at the beginning of this article, delayed messages can be implemented in a number of ways. However, our system already makes heavy use of RocketMQ, and it is a reliable and convenient way to implement delayed messaging with mature RocketMQ.


Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.