sequence
This article focuses on sendOrderly of RocketMQ
sendOrderly
Rocketmq – spring – the boot / 2.0.4 / rocketmq – spring – the boot – 2.0.4 – sources. The jar! /org/apache/rocketmq/spring/core/RocketMQTemplate.java
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { //...... public SendResult syncSendOrderly(String destination, Message<? > message, StringhashKey, long timeout) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
long now = System.currentTimeMillis();
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
long costTime = System.currentTimeMillis() - now;
if (log.isDebugEnabled()) {
log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
}
return sendResult;
} catch (Exception e) {
log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } public void asyncSendOrderly(String destination, Message<? > message, StringhashKey, SendCallback sendCallback,
long timeout) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}
try {
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);
} catch (Exception e) {
log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); }} / /... }Copy the code
- The last call to the syncSendOrderly method is producer.send(rocketMsg, messageQueueSelector, hashKey, timeout). The asyncSendOrderly method last calls producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout). More sendCallback than syncSendOrderly
DefaultMQProducer
Rocketmq – the client – 4.6.0 – sources jar! /org/apache/rocketmq/client/producer/DefaultMQProducer.java
public class DefaultMQProducer extends ClientConfig implements MQProducer {
//......
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
returnthis.defaultMQProducerImpl.send(msg, selector, arg, timeout); } public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } / /... }Copy the code
- Is defaultMQProducerImpl DefaultMQProducer the send method of the last call send (MSG, the selector, arg, timeout) or defaultMQProducerImpl. Send (MSG, Selector, arG, sendCallback, timeout) methods
DefaultMQProducerImpl
Rocketmq – the client – 4.6.0 – sources jar! /org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner {
//......
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if(topicPublishInfo ! = null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if(mq ! = null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, "+ msg.getTopic(), null); } / /... }Copy the code
- DefaultMQProducerImpl’s send method calls sendSelectImpl, The method in the Validators. After checkMessage through tryToFindTopicPublishInfo (MSG) getTopic ()) to find topicPublishInfo, couldn’t find the thrown MQClientException
- Found topicPublishInfo by mQClientFactory. GetMQAdminImpl () parsePublishMessageQueues (topicPublishInfo. GetMessageQueueList () ) get messageQueueList
- After through mQClientFactory getClientConfig (.) queueWithNamespace (selector. Select (messageQueueList userMessage, arg)) method for mq, SendKernelImpl (MSG, MQ, communicationMode, sendCallback, NULL, timeouts-costtime) is sent if MQ is not null
summary
- DefaultMQProducerImpl’s send method calls sendSelectImpl, The method in the Validators. After checkMessage through tryToFindTopicPublishInfo (MSG) getTopic ()) to find topicPublishInfo, couldn’t find the thrown MQClientException
- Found topicPublishInfo by mQClientFactory. GetMQAdminImpl () parsePublishMessageQueues (topicPublishInfo. GetMessageQueueList () ) get messageQueueList
- After through mQClientFactory getClientConfig (.) queueWithNamespace (selector. Select (messageQueueList userMessage, arg)) method for mq, SendKernelImpl (MSG, MQ, communicationMode, sendCallback, NULL, timeouts-costtime) is sent if MQ is not null
The sendDefaultImpl method is not used, which selects MessageQueue via selectOneMessageQueue(topicPublishInfo, lastBrokerName); The sendOrderly method sends messages by selecting MessageQueueSelector
doc
- DefaultMQProducerImpl