Delayed message
Delayed message means that after the message is sent, consumers do not want to get the message immediately, but wait for a specified time before consumers get the message for consumption.
Typical scenarios for using delayed messages, such as:
- In e-commerce systems, if a customer does not pay within 30 minutes of placing an order, the order may be cancelled.
- In the e-commerce system, if a user does not comment on a product within seven days, the product will be praised by default.
Solutions for these scenarios include:
- Poll through database records
- The JDK DelayQueue
- ScheduledExecutorService
- Timed tasks based on Quartz
- Zset based on Redis delay queue.
In addition, message queues can be used to implement delayed messages, such as RocketMQ.
2. RocketMQ
RocketMQ is a distributed messaging and streaming data platform with low latency, high performance, high reliability, trillion-scale capacity, and flexible scalability. RocketMQ is the third generation distributed messaging middleware opened source by Alibaba in 2012.
RocketMQ implements delayed messaging
3.1 Service Background
When our system completes an operation, it pushes an event message to the business side’s interface. When the return value of the notification interface of the business side is success, the push message is successful. When the return value is failure, the message is pushed multiple times until success is returned (at least one success is guaranteed).
When we failed to push, we would push the message multiple times, but not immediately. There will be a delay and push messages according to certain rules.
For example: try to push after 1 hour, try to push after 3 hours, try to push after 1 day, try to push after 3 days, etc. Therefore, consider using delayed messages to implement this feature.
3.2 Producers
The producer is responsible for generating the messages, and the producer sends the messages generated by the business application system to the message server.
First, define an AbstractProducer that supports delayed sending.
abstract class AbstractProducer :ProducerBean() {
var producerId: String? = null
var topic: String? = null
vartag: String? =null
var timeoutMillis: Int? = null
var delaySendTimeMills: Long? = null
val log = LogFactory.getLog(this.javaClass)
open fun sendMessage(messageBody: Any, tag: String) {
val msgBody = JSON.toJSONString(messageBody)
val message = Message(topic, tag, msgBody.toByteArray())
if(delaySendTimeMills ! =null) {
val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
message.startDeliverTime = startDeliverTime
log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")}val logMessageId = buildLogMessageId(message)
try {
val sendResult = send(message)
log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
} catch (e: Exception) {
log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
}
}
fun buildLogMessageId(message: Message): String {
return "topic: " + message.topic + "\n" +
"producer: " + producerId + "\n" +
"tag: " + message.tag + "\n" +
"key: " + message.key + "\n"}}Copy the code
According to service requirements, a Producer supporting retry mechanism is added
@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {
lateinit var delaySecondList:List<Long>
fun sendMessage(messageBody: CleanReportPushEventMessage){
// The event will not be emitted after the number of retries exceeds
if(delaySecondList! =null) {
if(messageBody.times>=delaySecondList.size){
return
}
val msgBody = JSON.toJSONString(messageBody)
val message = Message(topic, tag, msgBody.toByteArray())
val delayTimeMills = delaySecondList[messageBody.times]*1000L
message.startDeliverTime = System.currentTimeMillis() + delayTimeMills
log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
val logMessageId = buildLogMessageId(message)
try {
val sendResult = send(message)
log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
} catch (e: Exception) {
log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
}
}
}
}
Copy the code
More than the number of retry in CleanReportPushEventProducer wouldn’t send the message again.
Each delayed message also takes a different time, so the delayTimeMills need to be retrieved based on the number of retries.
To set the startDeliverTime of message, run system.currentTimemillis () + delayTimeMills. Then call Send (message) to send a delayed message.
We use the commercial version of RocketMQ, so we support delayed messages with accuracy of seconds. In the open source version, RocketMQ supports only 18 specific levels of delayed messages. : (
3.3 Consumer
The consumer is responsible for consuming the message, and the consumer pulls the information from the message server and enters it into the user application.
AbstractConsumer:
@Data
abstract class AbstractConsumer ():MessageListener{
var consumerId: String? = null
lateinit var subscribeOptions: List<SubscribeOptions>
var threadNums: Int? = null
val log = LogFactory.getLog(this.javaClass)
override fun consume(message: Message, context: ConsumeContext): Action {
val logMessageId = buildLogMessageId(message)
val body = String(message.body)
try {
log.info(logMessageId + " body: " + body)
val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
log.info(logMessageId + " result: " + result.name)
return result
} catch (e: Exception) {
if (message.reconsumeTimes >= 3) {
log.error(logMessageId + " error: " + e.message, e)
}
return Action.ReconsumeLater
}
}
abstract fun getMessageBodyType(tag: String): Type?
abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action
protected fun buildLogMessageId(message: Message): String {
return "topic: " + message.topic + "\n" +
"consumer: " + consumerId + "\n" +
"tag: " + message.tag + "\n" +
"key: " + message.key + "\n" +
"MsgId:" + message.msgID + "\n" +
"BornTimestamp" + message.bornTimestamp + "\n" +
"StartDeliverTime:" + message.startDeliverTime + "\n" +
"ReconsumeTimes:" + message.reconsumeTimes + "\n"}}Copy the code
The specific consumer is redefined and the message can be sent again after the consumer fails.
@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {
val logger: Logger = LoggerFactory.getLogger(this.javaClass)
override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
if(obj is CleanReportPushEventMessage){
// Clear the event
logger.info("consumer clean-report event report_id:${obj.id} ")
// Send the message again after the consumption fails
if(! cleanReportService.sendCleanReportEvent(obj.id)){val times = obj.times+1
eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
}
}
return Action.CommitMessage
}
override fun getMessageBodyType(tag: String): Type? {
return CleanReportPushEventMessage::class.java}}Copy the code
SendCleanReportEvent () of cleanReportService invokes the interface provided by the service side through HTTP to push event messages. If the push fails, the next push will be made. (The sendMessage() method of eventProducer is used to repost the message because the message is successfully sent based on the content returned by the calling HTTP interface.)
Finally, define ConsumerFactory
@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {
val logger: Logger = LoggerFactory.getLogger(this.javaClass)
@PostConstruct
fun start(a) {
CompletableFuture.runAsync{
consumers.stream().forEach {
valproperties = buildProperties(it.consumerId!! , it.threadNums)val consumer = ONSFactory.createConsumer(properties)
if(it.subscribeOptions ! =null&&! it.subscribeOptions!! .isEmpty()) {for (options in it.subscribeOptions!!) {
consumer.subscribe(options.topic, options.tag, it)
}
consumer.start()
val message = "\n".plus( it.subscribeOptions!! .stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
.collect(Collectors.toList<Any>()))
logger.info(String.format("consumer: %s\n", message))
}
}
}
}
private fun buildProperties(consumerId: String,threadNums: Int?).: Properties {
val properties = Properties()
properties.put(PropertyKeyConst.ConsumerId, consumerId)
properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
} else {
// Test environment access RocketMQ
properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
}
properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
return properties
}
}
Copy the code
4. To summarize
As mentioned at the beginning of this article, delayed messages can be implemented in a number of ways. However, our system already makes heavy use of RocketMQ, and it is a reliable and convenient way to implement delayed messaging with mature RocketMQ.
Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.