To solve the above problem, there are two scenarios: RocketMQ using aliYun’s cloud server and RocketMQ built by ourselves. However, either of these two methods can be used for business differentiation under the same topic through tag.
There are a lot of articles on the Internet that analyze related usage methods. Although the result of analysis is “no”, we can solve it through some other solutions.
RocketMQ
RocketMQ can be built independently, and then integrated by SpringBoot. You can refer to the article “SpringBoot Rapid integration RocketMQ Field tutorial” in the public account [program new vision]. You can pay attention to the public account search, and you can also reply to “1003” after the public account. Complete actual combat steps.
Here we only extract part of the code of the consumer:
@Service @RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED , selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED) public class MqRegisteredListenerDemo implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class); @Override public void onMessage(String message) { log.info("received registered message: {}", message); }}Copy the code
This is one of the consumers. The consumption topic is MQtopicConstant. DEMO_TOPIC, and the consumerGroup is REGISTERED. Tag is the REGISTERED tag specified by selectorExpression.
The consumer implementation of another tag for the same topic is as follows:
@Service @RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC , consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_MODIFY , selectorExpression = MqTopicConstant.DEMO_TAG_MODIFY) public class MqModifyListenerDemo implements RocketMQListener<String> { private static final Logger log = LoggerFactory.getLogger(MqRegisteredListenerDemo.class); @Override public void onMessage(String message) { log.info("received modify message: {}", message); }}Copy the code
We can see that the topic is the same, but the consumerGroup and tag are different. What does that mean? This indicates that the same topic can be distinguished by tag as long as the consumerGroup is different.
On other source code is no longer posted here, details can be concerned about the public number to see the corresponding article.
RocketMQ based on cloud services
RocketMQ based on cloud service is basically the same as self-built. As long as we make sure that the groupId (ali Cloud name) is different, then the tag under the same topic can be distinguished.
Only part of the code is posted here:
@Configuration public class ConsumerClient { @Resource private MqConfigProperties mqConfigProperties; @Resource private EquipmentMessageListener equipmentMessageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); / / configuration file Properties Properties = mqConfigProperties. GetMqProperties (); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfigProperties.getGroupId()); / / consumer fixed number of threads for 20 of 20 for the default properties. The setProperty (PropertyKeyConst ConsumeThreadNums, "20"); consumerBean.setProperties(properties); // Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); -------- Subscription = new Subscription(); Consume news belongs to the topic / / set the subscription. SetTopic (MqConfigProperties. GetInnerTopic ()); Consume news belongs to tag / / set the subscription. SetExpression (MqConfigProperties. GetEquipmentMonitorTag ()); // Implement MessageListener interfaces, and implement consume logic subscriptiontable. put(Subscription, equipmentMessageListener) in consume methods; // Subscribe to multiple topics as set above // -------- business block end -------- // Place subscriber messages into the consumerBean, and when Spring initially loads the bean, Listen to the news of the Topic and the tag consumerBean in the MQ. SetSubscriptionTable (subscriptionTable); return consumerBean; }}Copy the code
In the above code, the focus is on the code in the business block. If you copy the code in the business block in the subscription relationship and modify the corresponding Expression value (that is, the tag value), it will not be successful.
Often a large number of messages are sent and only a portion of them are received. The rest will be covered over. Of course, if you want to use a different topic to handle it, just modify the content in the business section and add it to the subscriptionTable.
So, how to solve the problem in the title? The idea is the same as the first solution. Ali Cloud just creates a ConsumerBean here, while the above independent construction adopts multiple consumers. The solution is to initialize multiple ConsumerBeans, each with a different groupId and tag configuration, and register different listeners.
This allows you to listen for different tags within a topic.
The principle of analysis
Two consumers of the same ConsumerGroup subscribe to the same Topic, but with different tags. Consumer1 subscribe to the tag1 of the Topic, and Consumer2 subscribe to the tag2 of the Topic, and then launch separately. Ten data are sent to Topic tag1 and ten data are sent to Topic tag2. Consumer1 and Consumer2 receive the corresponding 10 messages. It turns out that only Consumer2 received the message, and only 4-6 messages were received.
The reasons for this phenomenon are: Message allocation is determined by the Broker, not the Consumer. The Consumer sends a heartbeat to the Broker, which stores the message in the consumerTable (which is a Map). The key is the GroupName. The value is ConsumerGroupInfo. ConsumerGroupInfo contains topic and other information, but the problem lies in the previous step. The key is groupName, which is overwritten by the last Consumer received by the Broker’s heartbeat.
So this is the same as key, so it’s definitely overwriting. So Consumer1 doesn’t receive any messages, but why does Consumer2 only receive half (unfixed) messages?
That’s because cluster consumption, which is load-balanced among the nodes, so half of the messages (an unfixed number) go to Consumer1, which subscribs to TAG1, so there’s no output.
If you switch to BROADCASTING, the latter gets all the messages, not half, because radio is BROADCASTING all the consumers.
If there are other related problems, you can also pay attention to the public account “Program New Vision”, communicate and learn from each other.
RocketMQ: Is it possible to subscribe to the same topic using different tags?
Program new horizon
\
The public account “program new vision”, a platform for simultaneous improvement of soft power and hard technology, provides massive information
\