News consumption
First let’s look at what message consumption concerns in RocketMQ.
- Message queue load and redistribution
- Message consumption pattern
- Message pull mode
- Message progress feedback
- The message filter
- The order message
An overview of the
Message consumption is spread out as a group, and it is important to maintain uniform subscriptions among consumer groups that can contain multiple consumers (only those in the same consumer group are allowed within the same JVM instance).
There are two consumption patterns among consumer groups:
- Broadcast mode: The same message under a topic is consumed once by all consumers in the cluster.
- Cluster mode: only one consumer is allowed to consume the same message under a topic.
There are also two ways to transmit messages between the messaging server and the consumer:
- Pull mode: The consumer initiates a pull request
- Push mode: after the message reaches the server, it is pushed to the message consumers (actually push mode is also based on the pull mode, encapsulated in the pull mode).
The source code parsing
Consumer initiation process
Let’s look at DefaultMQPushConsumerImpl# start method.
// Check the configuration this.checkconfig (); // Build topic subscription message SubscriptionData this.copysubscription ();Copy the code
There’s nothing to be said for checking the configuration, so let’s look at the copySubscription method.
Private void copySubscription() throws MQClientException {try {// Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if(sub ! = null) {for(final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString); // Add to subscriptionInner of rebalanceImpl // subscriptionInner is a ConcurrentMapHashMap Topic for the key this. RebalanceImpl. GetSubscriptionInner (.) put (topic, subscriptionData); }} // Register listenersif(null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); .} the switch (enclosing defaultMQPushConsumer getMessageModel ()) {/ / if it is a broadcast mode, do not need to try againcase BROADCASTING:
break; // In clustered mode, add RETRY topic to subscriptionInner. // Message retries are based on consumer group and topic is %RETRY%+ consumer group namecase CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e); }}Copy the code
As you can see, subscriptionInner not only stores user-specific subscriptions, but also automatically subscribing to a RETRY topic with a %RETRY%+ consumer group name.
Let’s move on to the consumer initiation process.
// Change instanceName to process ID in cluster modeif(this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } // Initialize ClientInstance, as stated in the previous article, MQClientManager is singleton. // Then add this Instance to the factoryTable of MQClientManager with the key IP + PID. // This means that there is only one MQClientInstance in a single process, unless InstanceName is set. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); / / initialization message to load the implementation class enclosing rebalanceImpl. SetConsumerGroup (this) defaultMQPushConsumer) getConsumerGroup ()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);Copy the code
The previous step is to generate an MQClientInstance and register it in the factoryTable of MQClientManager, which is used for external communication.
The message load balancing implementation class is then initialized.
// long connection, PullAPIWrapper = new pullAPIWrapper (mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);Copy the code
PullAPIWrapper is initialized.
if(this.defaultMQPushConsumer.getOffsetStore() ! = null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); }else{switch (this) defaultMQPushConsumer) getMessageModel ()) {/ / if it is broadcast consumption patterns, the offset in the local storecase BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break; // In clustered mode, offset is stored on the broker side.case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } // load offset, an empty method in cluster mode this.offsetStore.load();Copy the code
The offsetStore is initialized, with the offset stored on the broker side in cluster mode and locally in broadcast mode. It makes sense that in cluster mode a message can be consumed by only one consumer, whereas in broadcast mode a message must be consumed by all consumers.
// If the message is sequentialif (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if(enclosing getMessageListenerInner () instanceof MessageListenerConcurrently) {/ / concurrent message enclosing consumeOrderly =false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } / / start this thread, consumeMessageService service this. ConsumeMessageService. Start ();Copy the code
Initialize and start the consumeMessageService consumption thread service, which is responsible for message consumption, including ordered messages and unordered messages. Let’s take a look at the implementation of the concurrent consumption thread service, followed by the sequential consumption logic.
Look at the structure of the ConsumeMessageConcurrentlyService method.
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) {/ / set defaultMQPushConsumerImpl enclosing defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; This. MessageListener = messageListener; / / set defaultMQPushConsumer enclosing defaultMQPushConsumer = this. DefaultMQPushConsumerImpl. GetDefaultMQPushConsumer (); / / set the consumer group of enclosing consumerGroup = this. DefaultMQPushConsumer. GetConsumerGroup (); ConsumeRequestQueue = new LinkedBlockingQueue<Runnable>(); / / consumer thread pool enclosing consumeExecutor = new ThreadPoolExecutor (this) defaultMQPushConsumer) getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_")); / / this regularly. ScheduledExecutorService = Executors. NewSingleThreadScheduledExecutor (new ThreadFactoryImpl ("ConsumeMessageScheduledThread_")); / / regularly cleaned thread pool enclosing cleanExpireMsgExecutors = Executors. NewSingleThreadScheduledExecutor (new ThreadFactoryImpl ("CleanExpireMsgScheduledThread_"));
}
Copy the code
Basically, various thread pools are initialized.
// a thread that periodically cleans up expired messages is started. public voidstart() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
Copy the code
More on that later, there are basically three thread pools, and that’s the beauty of RocketMQ, the asynchrony that works with each other.
Back to the consumer initiation process.
/ / register to MQClientInstance defaultMQPushConsumerImpl to consumer groups for the key, Boolean registerOK indicates that only one consumer group instance with the same name is allowed in a process mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); // If the registration fails, an exception is thrownif(! registerOK) { ` this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please."+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // start MQClientInstance mqclientFactory.start ();Copy the code
Mainly to register MQClientInstance defaultMQPushConsumerImpl and start MQClientInstance. Take a look at what the MQClientInstance startup does.
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break; }}}Copy the code
Let’s see what the scheduled task does below.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
Copy the code
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
Copy the code
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
Copy the code
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
Copy the code
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
Copy the code
The consumer initiates the process summary
- Check the configuration
- Add the subscription to the load implementation class
- Change the instance ID to the process ID
- Initialize and add the MQClientInstance to the MQClientManager
- Initialize the load balancing implementation class
- Example Initialize the long link
- Initialize offsetStore
- Initialize and start the consumeMessageService consumer thread service
- Register yourself with MQClientInstance
- Start the MQClientInstance
Message pull process
PullMessageService
As you can see from the above, there is only one MQClientInstance in a process (except when you set InstanceName yourself). As you can see from the startup process of MQClientInstance, MQClientInstance uses a separate thread PullMessageService to take care of pulling messages.
PullMessageService inherits ServiceThread, which implements Runnable. In fact, ServiceThread simply overrides methods like Start and sets the thread as a daemon thread. Let’s take a look at the Run method of the PullMessageService.
public void run() {
log.info(this.getServiceName() + " service started");
while(! this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
Copy the code
Its main task is to loop and block the pullRequest from the pullRequestQueue.
The pullMessage method is then called for processing.
Let’s take a look at when the pullRequest is put into the pullRequestQueue. PullMessageService
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if(! isStopped()) { this.scheduledExecutorService.schedule(newRunnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e); }}Copy the code
The PullRequest object will be added to the pullRequestQueue again after the message pull task ends.
And RebalanceImpl.
PullMessageServuce will execute a PullRequest only when it gets the PullRequst object. Let’s take a look at the PullRequest.
Public class PullRequest {/** * private String consumerGroup; /** * private MessageQueue MessageQueue; /** * private ProcessQueue ProcessQueue; /** */ private long nextOffset; /** * whether to lock */ private Boolean lockedFirst =false;
}
Copy the code
Continue back to PullMessageService
Private void pullMessage(Final PullRequest PullRequest) {// Obtain the corresponding implementation class final MQConsumerInner from MQclientInstance based on the consumer group name consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if(consumer ! = null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; / / call DefaultMQPushConsumerImpl pullMessage impl. PullMessage (pullRequest); }else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }}Copy the code
Before I go into the message pull process. Let’s start with ProcessQueue.
ProcessQueue is a snapshot of MessageQueue on the message side. By default, the PullMessageService pulls 32-hop messages from the message server and stores them in ProcessQueue in order of queue offset. The PullMessageService then commits the messages to the consumer consumption thread pool. The message is removed from ProcessQueue after it is successfully consumed.
Message pull process
Message pull is divided into three main steps.
- Client message pull request encapsulation
- The Broker finds and returns messages
- The Client processes the returned message
Message pull
/ / get the queue snapshot final ProcessQueue ProcessQueue = pullRequest. GetProcessQueue (); // If the current state of the processing queue is discardedif (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return; } // Update processQueue's LastPullTimestamp to the current timestamp pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); MakeSureStateOK (); // makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return; } // If the current consumer is suspended, the PullMessageService will put the pull task into the PullMessageService's pull task queue again, which ends the pull taskif (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
Copy the code
Prevalidation of some message pull.
// If the total number of messages is greater than 1000if(cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { // Delay in will pull a second task delay again into the PullMessageService pull task queue this. ExecutePullRequestLater (pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // A warning log is generated when the number of flow control attempts reaches 1000if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); }return; } // If the message size is greater than 100MBif(cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { // Delay in will pull a second task delay again into the PullMessageService pull task queue this. ExecutePullRequestLater (pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // A warning log is generated when the number of flow control attempts reaches 1000if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); }return;
}
Copy the code
Flow control is carried out from two dimensions:
- The total number of messages must not exceed 1000
- The message size cannot be larger than 100MB
// If the message is not sequentialif(! This. consumeOrderly) {// If the maximum offset interval for messages in the snapshot is greater than 2000if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return; }}Copy the code
In the case of concurrent message consumption, check whether the maximum offset interval of messages in the snapshot is greater than 2000. If yes, flow control is required.
The logic of sequential messages is covered below.
Final SubscriptionData SubscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); // If null, delay 3s pullif (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
Copy the code
Get the subscription information for this topic from the rebalanceImpl.
boolean commitOffsetEnable = false; long commitOffsetValue = 0L; // In cluster consumption modeif(MessageModel. CLUSTERING = = this. DefaultMQPushConsumer. GetMessageModel ()) {/ / read from the local cache offset consumption commitOffsetValue = progress this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); // If the consumption progress is greater than 0if(commitOffsetValue > 0) {// Tell the Broker to save the consumption progress commitOffsetEnable =true; String pression = null; boolean classFilter =false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if(sd ! = null) {if(this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && ! sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = pullsysflag. buildSysFlag(commitOffsetEnable, // commitOffsettrue, / /suspendsubExpression ! = null, // subscription classFilter // class filter );Copy the code
The system token uses four binary bits to represent four status bits.
Private final static int FLAG_COMMIT_OFFSET = 0x1; private final static int FLAG_COMMIT_OFFSET = 0x1; /** * private final static int FLAG_SUSPEND = 0x1 << 1; /** * private final static int FLAG_SUBSCRIPTION = 0x1 << 2; Private final static int FLAG_CLASS_FILTER = 0x1 << 3; private final static int FLAG_CLASS_FILTER = 0x1 << 3;Copy the code
Back to the radu message.
Try {this. PullAPIWrapper. PullKernelImpl (pullRequest getMessageQueue (), / / which message queue subExpression, . / / the message filter expressions subscriptionData getExpressionType (), / / message filtering expression type subscriptionData. GetSubVersion (), . / / the message filter expressions version number pullRequest getNextOffset (), / / message pull offset this. DefaultMQPushConsumer. GetPullBatchSize (), BROKER_SUSPEND_MAX_TIME_MILLIS, Broker_suspend_time_millis, broker_suspend_time_millis, broker_suspend_time_millis, broker_suspend_time_millis, broker_suspend_time_millis, broker_suspend_time_millis, broker_suspend_time_millis Communicationmode.async, communicationmode.async, communicationmode.async // Asynchronous pull pullCallback // callback method to pull messages from the broker); } catch (Exception e) { log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
Copy the code
Finally pull the message.
We look at the org. Apache. Rocketmq. Client. Impl. Consumer. PullAPIWrapper# pullKernelImpl method.
FindBrokerResult FindBrokerResult = Queries broker address information from memory based on BrokerName and brokerId this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq),false); // If memory is empty, update the Broker address of memory from NamerServerif (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
Copy the code
Get broker information first, classic caching strategy.
if(findBrokerResult ! = null) {{// If the filter expression is not of TAG type && the version of the broker is smaller than V4_1_0_SNAPSHOT // this means that the V4_1_0 version only supports TAG typeif(! ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ","
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by "+ expressionType, null); } } int sysFlagInner = sysFlag; // Clear the FLAG_COMMIT_OFFSET flag if the node is a slaveif(findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr();if(PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } / / send the request PullResult PullResult = this. MQClientFactory. GetMQClientAPIImpl () pullMessage (brokerAddr requestHeader, timeoutMillis, communicationMode, pullCallback);return pullResult;
}
Copy the code
Finally, the request is sent asynchronously by pullMessageAsync.
Message pull clients process messages
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if(response ! = null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); assert pullResult ! = null; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); }}else {
if(! responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: "+ request, responseFuture.getCause())); }}}});Copy the code
The onSuccess method is called back if the request is successfully parsed.
/ / get the offset of the next pull long prevRequestOffset = pullRequest. GetNextOffset (); / / set the next pull offset pullRequest. SetNextOffset (pullResult. GetNextBeginOffset ());Copy the code
First set the offset for the next pull.
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
Copy the code
If no message is returned, the pullRequest is added to the pullRequestQueue of the pullMessageService.
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
Copy the code
The pulled messages are stored in processQueue. The pulled message is then handed over to consumeMessageService’s thread pool for processing.
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
Copy the code
After you add the pullRequest to the pullMessageService, the pull task is finished.
And exception logic, if you’re interested, you can take a look. I’m not going to go into that.
The flow chart of the todo
Todo load balancing
… to be continued