Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”
This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
18RocketMQ sequential messages
Sequential message sending
@GetMapping(value = "/orderly")
public String orderly(a) {
List<String> typeList = Arrays.asList("Create"."Pay"."Refund");
for (String type : typeList) {
Order order = new Order("123", type);
MessageBuilder builder = MessageBuilder.withPayload(order);
Message message = builder.build();
SendResult sendResult = rocketMQTemplate.syncSendOrderly("TopicTest", message, order.getOrderId());
System.out.println("MsgId = " + sendResult.getMsgId() + ", QueueId = " + sendResult.getMessageQueue().getQueueId());
}
return "OK";
}
Copy the code
Sending sequential messages compared to sending regular messages:
- Change the default asynchronous send to synchronous send in the configuration file
- Set the Header to send messages to the same message queue
Receiving sequential messages compared to receiving ordinary messages:
Change the default concurrent consumption to sequential consumption
RockeMQ sequential messages are divided into two types:
- Partial order: Messages sent to the same queue are ordered. Queues can be specified when messages are sent and consumed in order when messages are consumed. For example, the consumption of the same order ID should be orderly, and the consumption of different order ids should not affect each other and be processed in parallel
- Global ordering: Only one queue implements global ordering for a Topic, which is set manually when creating a Topic. This poor performance is not recommended
RocketMQ messages can be sent in three ways: synchronous, asynchronous, and unidirectional.
- Synchronization: After sending a network request, it waits for the Broker server to return the result synchronously. Retry after sending a failed request is supported
- Asynchronous: Sends network requests asynchronously, does not block the current thread, and does not support retry
- Unidirectional: the principle is the same as asynchronous and does not support callback
The principle of sequential message sending is very simple. The same type of message can be sent to the same queue. To ensure that the message sent first is stored in the message queue, the synchronous sending mode must be used
RocketMQTemplate’s syncSend() method:
public SendResult syncSend(String destination, Message<? > message,long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSend failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
charset, destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
SendResult sendResult = producer.send(rocketMsg, timeout);
long costTime = System.currentTimeMillis() - now;
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
return sendResult;
} catch (Exception e) {
log.error("syncSend failed. destination:{}, message:{} ", destination, message);
throw newMessagingException(e.getMessage(), e); }}Copy the code
The implementation of MessageQueueSelector class SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
returnmqs.get(value); }}Copy the code
- Calculates the hash value based on the hashKey
- Then modulo the hash value with the queue size to get an index value that is less than the queue value
- Retrieves a queue from the queue list based on the index value. If the hash value is the same, the queue is the same
Sending common messages
There are two mechanisms for ordinary messages: polling and fault avoidance
The polling principle is routing information TopicPublishInfo maintains a counter sendWhichQueue, which needs to query a route every time it sends a message. The counter carries out +1, and realizes the polling algorithm by calculating the value inDE of the counter and taking modulus of the number of queues.
package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public boolean isOrderTopic(a) {
return orderTopic;
}
public void setOrderTopic(boolean orderTopic) {
this.orderTopic = orderTopic;
}
public boolean ok(a) {
return null! =this.messageQueueList && !this.messageQueueList.isEmpty();
}
public List<MessageQueue> getMessageQueueList(a) {
return messageQueueList;
}
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
public ThreadLocalIndex getSendWhichQueue(a) {
return sendWhichQueue;
}
public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
this.sendWhichQueue = sendWhichQueue;
}
public boolean isHaveTopicRouterInfo(a) {
return haveTopicRouterInfo;
}
public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
this.haveTopicRouterInfo = haveTopicRouterInfo;
}
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if(! mq.getBrokerName().equals(lastBrokerName)) {returnmq; }}returnselectOneMessageQueue(); }}public MessageQueue selectOneMessageQueue(a) {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
public int getQueueIdByBroker(final String brokerName) {
for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
if (queueData.getBrokerName().equals(brokerName)) {
returnqueueData.getWriteQueueNums(); }}return -1;
}
@Override
public String toString(a) {
return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
+ ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
}
public TopicRouteData getTopicRouteData(a) {
return topicRouteData;
}
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData; }}Copy the code
The polling algorithm may poll the selected queue on a broken Broker, causing messages to fail to be sent, and hence the applause avoidance mechanism