Reset consumption point
Resetting consumption points based on time does not seem to occur in normal Rocket execution logic, which seems to be triggered only by using the reset function from the console. Let’s analyze the implementation process of TA.
The entrance
Rocket operations and maintenance instructions are wrapped as SubCommand objects, and resetting the consumption point corresponds to the ResetOffsetByTimeCommand object. When Rocket's MQAdmin is started, Call initCommand() to instantiate and register the command object in the subCommandList. After receiving the command, findSubCommand() can locate the corresponding SubCommand object and call buildCommandlineOptions() of the command object to parse and assemble the parameters. Finally returns a parameter collection object, Options. The real logic needs to execute the command object's execute();Copy the code
ResetOffsetByTimeCommand.execute()
The call stack in ConsumeQueue is obtained based on the timestamp as follows: ResetOffsetByTimeCommand.execute() -> DefaultMQAdminExt.resetOffsetByTimestamp() -> DefaultMQAdminExtImpl.resetOffsetByTimestamp() -> MQClientAPIImpl.invokeBrokerToResetOffset() -> RemotingClient. InvokeSync () assuming no server-side development experience, also does not have the system studied network programming, invocation chain to disconnect here. You probably know that Rocket components communicate via Tcp, and that information is exchanged between components through RPC calls. The communication protocol for the information exchanged between Rocket components specifically defines the business types for this data frame, all of which are specified in the RequestCode. There is also a convention for resetting the consumption site whose business type enumeration constant is requestCode. INVOKE_BROKER_TO_RESET_OFFSET. Follow the trail to find the message's handler: (This assumes that you understand Rocket's logic for processing packets, if not, Follow the way of thinking can be found here) AdminBrokerProcessor. The processRequest () - > AdminBrokerProcessor. ResetOffset () - > Broker2Client.resetOffset() -> DefaultMessageStore.getOffsetInQueueByTime() -> ConsumeQueue.getOffsetInQueueByTime() According to the timestamp to get to the realization of the offset ConsumeQueue ConsumeQueue in this one. GetOffsetInQueueByTime ()Copy the code
Broker2Client
After unpacking and parsing the data frame, it contains the key information to reset the consumption point :topic, group, timeStamp. TopicConfigManager stores the configuration for each Topic, so you can easily get the configuration based on the Topic name. Here is mainly to obtain the key code snippet of ta writable queue number:
Map<MessageQueue, Long> offsetTable = new HashMap<>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
/* I is QueueId and constructs the MessageQueue object */
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
/* Calculate the offset in ConsumeQueue */ based on the timestamp
timeStampOffset = this.brokerController.getMessageStore()
.getOffsetInQueueByTime(topic, i, timeStamp);
offsetTable.put(mq, timeStampOffset);
}
Copy the code
The implementation of getOffsetInQueueByTime() is also pretty nice, ending with a call to ConsumeQueue#getOffsetInQueueByTime() that uses binary bits to give the final offset. By this point the Broker has calculated the offset for the specified point in time in each ConsumeQueu of the Topic, but the Client has no sense of it. Then, as before, the Broker makes an RPC call telling the Client that your Group in a Topic needs to update the consumption offset based on the data I transmitted this time
Broker Rpc Client
Excerpt some key code
public RemotingCommand resetOffset(String topic, String group, long timeStamp,
boolean isForce, boolean isC) {
/* Construct the request body */
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader
);
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
/* Execute the Rpc call */
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
}
Copy the code
Client reset
We can find the corresponding processing logic in RequestCode
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
/* Serialize data to get the mapping between Message and consumption offset */
Map<MessageQueue, Long> offsetTable = new HashMap<>();
if(request.getBody() ! =null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
offsetTable = body.getOffsetTable();
}
/* Modify the client's local consumption schedule based on the above data */
this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
return null;
}
Copy the code
MqClientFactory
MqClientFactory encapsulates the Rocket network processing Api and is a network channel through which message producers, message consumers, NameServ, and brokers exchange information. It also holds all Consumer instances under this Jvm instance to extract the MqClientFactory key code:
resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
/* Search for the corresponding Consumer */ from the list according to Group
MQConsumerInner impl = this.consumerTable.get(group);
/ * if it is a strong push mode is converted to DefaultMQPushConsumerImpl type * /
if(impl ! =null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
/* If not, end */
return;
}
/* Suspend consumption */
consumer.suspend();
/* Loop to update consumption progress */
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if(topic.equals(mq.getTopic()) && offset ! =null) {
try {
consumer.updateConsumeOffset(mq, offset);
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e); }}}/* Resume work */
consumer.resume();
}
Copy the code
Question:
ResetOffset found during the execution of a consumer type not DefaultMQPushConsumerImpl directly back ahead of schedule, the pull mode, realize consumer sites reset?