A series of
- RocketMQ Producer Starts the process
- RocketMQ Producer Synchronizes routes
- RocketMQ Producer Sends messages
- RocketMQ Producer Ordered messages
The opening
-
The main purpose of this series is to introduce the principles and usage of RocketMq Producer. In this series, we will introduce the initiation process of Producer, route synchronization of producer, message sending process of producer, and ordered messages of producer.
-
This article introduces the message sending process of producer and mainly introduces the message sending process of producer.
Producer, for example,
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 1、创建producer对象
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("localhost:9876"); // 2, start producer.start(); // 3. Producer sends messagesfor (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }}Copy the code
Producer Sends messages
- The producer message sending process executes DefaultMQProducerImpl, MQClientAPIImpl, and NettyRemotingClient in sequence to send messages.
- DefaultMQProducerImpl is responsible for selecting the specified MessageQueue for sending, encapsulating MessageQueue selection and retry for sending failures.
- MQClientAPIImpl provides synchronous sending of sendMessageSync, asynchronous sending of sendMessageAsync, and one-way sending of invokeOneway.
- MQClientAPIImpl finally implements message sending via NettyRemotingClient.
Producer Sends the source code of messages
DefaultMQProducerImpl#sendDefaultImpl
Public class Message implements Serializable {private static final Long serialVersionUID = 8445773977080406428L; private String topic; private int flag; private Map<String, String> properties; private byte[] body; private String transactionId; } public class DefaultMQProducerImpl implements MQProducerInner { private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; / / 1, to find TopicPublishInfo information TopicPublishInfo TopicPublishInfo = this. TryToFindTopicPublishInfo (MSG) getTopic ());if(topicPublishInfo ! = null && topicPublishInfo.ok()) { boolean callTimeout =false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal]; // 2. Send the packet according to the retry countfor (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); / / 3, choose MessageQueue object MessageQueue mqSelected = this. SelectOneMessageQueue (topicPublishInfo lastBrokerName);if(mqSelected ! = null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break; SendResult = this. SendKernelImpl (MSG, mq, communicationMode, sendCallback); topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev,false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if(sendResult.getSendStatus() ! = SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue; }}return sendResult;
default:
break;
}
} catch (RemotingException e) {
continue;
} catch (MQClientException e) {
continue;
} catch (MQBrokerException e) {
} catch (InterruptedException e) {
}
} else {
break; }}if(sendResult ! = null) {returnsendResult; } // omit related code}}}Copy the code
- The Message body source format for Message is shown above.
- DefaultMQProducerImpl#sendDefaultImpl is responsible for executing the send message and parsing the send result.
- TopicPublishInfo TopicPublishInfo queries topic information.
- Step 2: selectOneMessageQueue is responsible for selecting one of the MessageQueue queues.
- Step 3: Call DefaultMQProducerImpl#sendKernelImpl to send the message.
- Step 4: Execute different result return logic according to different sending mode communicationMode.
MQFaultStrategy#selectOneMessageQueue
public class MQFaultStrategy { public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, Final String lastBrokerName) {// omit the relevant codereturn tpInfo.selectOneMessageQueue(lastBrokerName);
}
}
public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
Copy the code
- SelectOneMessageQueue is responsible for selecting a MessageQueue for that topic for sending messages, and the selection logic is polling logic.
DefaultMQProducerImpl#sendKernelImpl
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if(brokerAddr ! = null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); SendMessageRequestHeader SendMessageRequestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if(reconsumeTimes ! = null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if(maxReconsumeTimes ! = null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) {case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if(! messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned =true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
returnsendResult; } catch (RemotingException e) { throw e; } catch (MQBrokerException e) { throw e; } catch (InterruptedException e) { throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); }}}}Copy the code
- DefaultMQProducerImpl#sendKernelImpl first assembles the SendMessageRequestHeader object.
- Secondly through mQClientFactory. GetMQClientAPIImpl (). SendMessage () sends the message.
MQClientAPIImpl#sendMessage
public class MQClientAPIImpl { public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); boolean isReply = msgType ! = null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); // Assemble different remotingCommands for different typesif (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else{ request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); }}else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else{ request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } } request.setBody(msg.getBody()); Switch (communicationMode) {communicationMode (communicationMode) {case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
returnnull; }}Copy the code
- MQClientAPIImpl#sendMessage is first responsible for assembling the RemotingCommand object.
- MQClientAPIImpl#sendMessage Next sends a message via NettyRemotingClient.
NettyRemotingClient
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); Final Channel Channel = this.getAndCreatechannel (addr);if(channel ! = null && channel.isActive()) { try {doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
returnresponse; } the catch (RemotingSendRequestException e) {/ / omit code}}else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); }}); } catch (Exception e) {// omit code}}else{// omit code}} Private Channel createChannel(final String ADDR) throws InterruptedException {ChannelWrapper CW = this.channelTables.get(addr);if(cw ! = null && cw.isOK()) {return cw.getChannel();
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if(cw ! = null) {if (cw.isOK()) {
return cw.getChannel();
} else if(! cw.getChannelFuture().isDone()) { createNewConnection =false;
} else {
this.channelTables.remove(addr);
createNewConnection = true; }}else {
createNewConnection = true;
}
if(createNewConnection) {// Create ChannelFuture ChannelFuture = through NettyClient's bootstrap this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); } } catch (Exception e) { } finally { this.lockChannelTables.unlock(); }}else{}if(cw ! = null) { ChannelFuture channelFuture = cw.getChannelFuture();if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
return cw.getChannel();
} else{}}else{}}returnnull; }}Copy the code
- NettyRemotingClient provides invokeSync, invokeAsync, and invokeOneway to send messages.
- NettyRemotingClient#createChannel is responsible for sending messages via bootstrap to connect to the specified address return Channel.