Self-praise: Dubbo Implementation Principles and Source Code Parsing — The Best Collection
Praise yourself: D Database Entity Design Collection
Abstract: the original source www.iocoder.cn/RocketMQ/fi… “Taro source” welcome to reprint, keep the summary, thank you!
This article is based on RocketMQ 4.0.x official release
- 1. An overview of the
- 2. Filtersrv registers with the Broker
- 3. The filter class
- 3.1 Set the filtering class code when Consumer subscribing
- 3.2 Consumer uploads the filtering class code
- 3.3 Filter Compiles the filtering class code
- 4. Filter messages
- 4.1 The Consumer pulls the message from Filtersrv
- 4.2 Filtersrv pulls messages from the Broker
- 5. Filtersrv is highly available
🙂🙂🙂 follow wechat public number:
- RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
- RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
- Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
- New source code parsing articles are notified in real time. It’s updated about once a week.
- Serious source communication wechat group.
1. An overview of the
Filtersrv, which is responsible for custom rules to filter messages that the Consumer pulls from the Broker.
Why doesn’t the Broker provide the ability to filter messages? Let’s take a look at the official line:
- Broker-side message filtering Filtering on the Broker as required by consumers has the advantage of reducing the network transmission of unwanted messages to consumers. The disadvantage is that it increases the burden on the Broker and the implementation is relatively complex. (1) Taobao Notify supports a variety of filtering methods, including direct filtering by message type and flexible syntax expression filtering, which can almost meet the most demanding filtering requirements. (2) Taobao RocketMQ supports filtering by simple Message Tag, Message Header and body. (3). CORBA Notification specification also supports flexible syntactic expression filtering.
- Consumer side message filtering This filtering method can be fully customized by the application, but the disadvantage is that many useless messages are transmitted to the Consumer side.
It is with this in mind that Filtersrv appears. Reducing the burden on the Broker and reducing the Consumer’s ability to receive unwanted messages. Of course, there are disadvantages, there is an extra layer of Filtersrv network overhead.
2. Filtersrv registers with the Broker
- 🦅 a
Filtersrv
onlyCorresponds to aBroker
. - 🦅 a
Broker
To the correspondingmultipleFiltersrv
.Filtersrv
High availability through boot multipleFiltersrv
The implementation. - 🦅
Filtersrv
Active when registration failsExit the closed.
The core code is as follows:
1: // ⬇️⬇️⬇️【 filtersrvController.java 】 2: public Boolean initialize() {3: //.... Code (omitted) 4:5: / / fixed interval registered to Broker 6: enclosing scheduledExecutorService. ScheduleAtFixedRate (new Runnable () {7:8: @ Override 9: public void run() { 10: FiltersrvController.this.registerFilterServerToBroker(); 11: } 12: }, 15, 10, TimeUnit.SECONDS); // TODO Edit by Taro: initialDelay time is too short, may cause initialization failure. From 3= "15 13:14: //.... 15:} 16:17: /** 18: * Register Filtersrv to Broker 19: *!! If registered failure, close Filtersrv 20: * / 21: public void registerFilterServerToBroker () {22: try {23: RegisterFilterServerResponseHeader responseHeader = 24: this.filterServerOuterAPI.registerFilterServerToBroker( 25: this.filtersrvConfig.getConnectWhichBroker(), this.localAddr()); 26: this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper() 27: .setDefaultBrokerId(responseHeader.getBrokerId()); 28: 29: if (null == this.brokerName) { 30: this.brokerName = responseHeader.getBrokerName(); 31: } 32: 33: log.info("register filter server<{}> to broker<{}> OK, Return: {} {}", 34: this.localAddr(), 35: this.filtersrvConfig.getConnectWhichBroker(), 36: responseHeader.getBrokerName(), 37: responseHeader.getBrokerId()); 38: } catch (Exception e) { 39: log.warn("register filter server Exception", e); 40: 41: log.warn("access broker failed, kill oneself"); 42: System.exit(-1); // Abnormal exit 43:} 44:}Copy the code
3. The filter class
3.1 Set the filtering class code when Consumer subscribing
- 🦅
Consumer
For eachTopic
You can subscribe to different onesFilter class code
.
1: / / ⬇ ️ ⬇ ️ ⬇ ️ DefaultMQPushConsumer. Java 2 】 : @ Override 3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { 4: this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); 5:}Copy the code
3.2 Consumer uploads the filtering class code
- 🦅
Consumer
Heartbeat register toBroker
At the same time, uploadFilter class code
到Broker
The correspondingallFiltersrv
.
1: // ⬇️⬇️⬇️ [mqclientinstance. Java] 2: /** 3: * Send heartbeat to Broker, upload Filtersrv 4: */ 5: public void sendHeartbeatToAllBrokerWithLock() { 6: if (this.lockHeartbeat.tryLock()) { 7: try { 8: this.sendHeartbeatToAllBroker(); 9: this.uploadFilterClassSource(); 10: } catch (final Exception e) { 11: log.error("sendHeartbeatToAllBroker exception", e); 12: } finally { 13: this.lockHeartbeat.unlock(); 14: } 15: } else { 16: log.warn("lock heartBeat, but failed."); 17:} 18:} 19: 20: /** 21: * Upload the filter class to Filtersrv 22: */ 23: private void uploadFilterClassSource() {24: Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); 25: while (it.hasNext()) { 26: Entry<String, MQConsumerInner> next = it.next(); 27: MQConsumerInner consumer = next.getValue(); 28: if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) { 29: Set<SubscriptionData> subscriptions = consumer.subscriptions(); 30: for (SubscriptionData sub : subscriptions) { 31: if (sub.isClassFilterMode() && sub.getFilterClassSource() ! = null) { 32: final String consumerGroup = consumer.groupName(); 33: final String className = sub.getSubString(); 34: final String topic = sub.getTopic(); 35: final String filterClassSource = sub.getFilterClassSource(); 36: try { 37: this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource); 38: } catch (Exception e) { 39: log.error("uploadFilterClassToAllFilterServer Exception", e); 40:} 41:} 42:} 43:} 44:} 45:}Copy the code
3.3 Filter Compiles the filtering class code
- 🦅
Filtersrv
To deal withConsumer
The uploadedFilter class code
, and carry oncompileUse.
The core code is as follows:
1: // ⬇️⬇️⬇️ [filterClassManager.java] 2: /** 3: * Register filter class 4: * 5: * @param consumerGroup 6: * @param Topic 7: CRC 9: * @param filterSourceBinary 10: * @return Is registered successfully 11: */ 12: public boolean registerFilterClass(final String consumerGroup, final String topic, 13: final String className, final int classCRC, final byte[] filterSourceBinary) { 14: final String key = buildKey(consumerGroup, topic); 16: Boolean registerNew = false; 17: FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key); 18: if (null == filterClassInfoPrev) { 19: registerNew = true; 20: } else { 21: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { 22: if (filterClassInfoPrev.getClassCRC() ! = classCRC && classCRC ! // Class change 23: registerNew = true; 26:25:24:}}} : 27 / filter/register new class 28: the if (registerNew) {29: synchronized pileLock (this.com) {30: filterClassInfoPrev = this.filterClassTable.get(key); 31: if (null ! = filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) { 32: return true; 33: } 34: try { 35: FilterClassInfo filterClassInfoNew = new FilterClassInfo(); 36: filterClassInfoNew.setClassName(className); 37: filterClassInfoNew.setClassCRC(0); 38: filterClassInfoNew.setMessageFilter(null); 39: 40: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) { 41: String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); 43: Class<? > newClass = DynaCode.compileAndLoadClass(className, javaSource); 44: // Create new filter objects 45: Object newInstance = newclass.newinstance (); 46: filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); 47: filterClassInfoNew.setClassCRC(classCRC); 48: } 49: 50: this.filterClassTable.put(key, filterClassInfoNew); 51: } catch (Throwable e) { 52: String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s", 53: consumerGroup, topic, className); 54: log.error(info, e); 55: return false; 56: } 57: } 58: } 59: 60: return true; 61:}Copy the code
4. Filter messages
4.1 The Consumer pulls the message from Filtersrv
- 🦅
Consumer
pullSubscribe using filter classesWhen consuming messages fromBroker
The correspondingFiltersrv
The list ofrandomSelect a pull message. If you don’t have a choiceFiltersrv
, cannot pull the message. As a result,Filtersrv
Be sure to do high availability.
1: // ⬇️⬇️⬇️ [pullapiwrapper.java] 2: /** 3: * pull message core method 4: * 5: * @param MQ message tubler6: * @param subExpression subscription expression 7: * @param subVersion subVersion version number 8: * @param offset: * @param maxNums number of pull messages 10: * @param sysFlag Number of pull messages 11: * @ param commitOffset submit consumption schedule 12: * @ param brokerSuspendMaxTimeMillis broker hang request maximum time 13: 14: * @param communicationMode 15: * @param pullCallback 16: * @return Pull message result. The result is returned only when the communication mode is synchronous. Otherwise, null is returned. 17: * @throws MQClientException When no broker is found or other client exceptions occur 18: * @throws RemotingException When an exception occurs on the remote call 19: * @throws MQBrokerException When an exception occurs to the broker. This exception occurs only when the communication mode is synchronous. 20: * @interruptedexception When an interrupt exception occurs 21: */ 22: protected PullResult pullKernelImpl(23: final MessageQueue mq, 24: final String subExpression, 25: final long subVersion, 26: final long offset, 27: final int maxNums, 28: final int sysFlag, 29: final long commitOffset, 30: final long brokerSuspendMaxTimeMillis, 31: final long timeoutMillis, 32: final CommunicationMode communicationMode, 33: final PullCallback pullCallback 34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 35: // // .... 36: // Request pull message 37: If (findBrokerResult! = null) { 38: // .... If code (omitted) 39: / / subscription topic using filter classes, using filtersrv get news 40: String brokerAddr = findBrokerResult. GetBrokerAddr (); 41: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { 42: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); 43: } 44: 45: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( 46: brokerAddr, 47: requestHeader, 48: timeoutMillis, 49: communicationMode, 50: pullCallback); 51: 52: return pullResult; } 54:55: // Broker information does not exist. throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); 57:} 58:59: /** 60: * Computes the filterSRv address. If there are multiple FilterSRVs, select one at random. 61: * 62: * @Param Topic 63: * @Param brokerAddr Broker address 64: * @return FilterSRV address 65: * @throws MQClientException When filterSRV does not exist 66: */ 67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) 68: throws MQClientException { 69: ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); 70: if (topicRouteTable ! = null) { 71: TopicRouteData topicRouteData = topicRouteTable.get(topic); 72: List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr); 73: if (list ! = null && ! list.isEmpty()) { 74: return list.get(randomNum() % list.size()); 75: } 76: } 77: throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: " 78: + topic, null); 79:}Copy the code
4.2 Filtersrv pulls messages from the Broker
- 🦅
Filtersrv
Pull the message, will suggestConsumer
向The Broker the master node
Pull the message. - 🦅
Filtersrv
You can think of it as aConsumer
toBroker
Actually used when pulling messagesDefaultMQPullConsumer.java
The method and logic of.
1: / / ⬇ ️ ⬇ ️ ⬇ ️ DefaultRequestProcessor. Java 2 】 : / 3: * * * pull message 4: * 5: * @ param CTX pull message context 6: * @param Request Pull message request 7: * @return response 8: * @throws Exception When an Exception occurs 9: */ 10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { 11: final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); 12: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); 13: final PullMessageRequestHeader requestHeader = 14: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); 15: 16: final FilterContext filterContext = new FilterContext(); 17: filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); 18: 19: response.setOpaque(request.getOpaque()); 20: 21: DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer(); 22:23: // Verify whether the Topic filter class is complete 24: final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic()); 25: if (null == findFilterClass) { 26: response.setCode(ResponseCode.SYSTEM_ERROR); 27: response.setRemark("Find Filter class failed, not registered"); 28: return response; 29: } 30: if (null == findFilterClass.getMessageFilter()) { 31: response.setCode(ResponseCode.SYSTEM_ERROR); 32: response.setRemark("Find Filter class failed, registered but no class"); 33: return response; 34:} 35: 36: // Sets the next request to the secondary Broker primary node. 37: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); 38: 39: MessageQueue mq = new MessageQueue(); 40: mq.setTopic(requestHeader.getTopic()); 41: mq.setQueueId(requestHeader.getQueueId()); 42: mq.setBrokerName(this.filtersrvController.getBrokerName()); 43: long offset = requestHeader.getQueueOffset(); 44: int maxNums = requestHeader.getMaxMsgNums(); 45: 46: final PullCallback pullCallback = new PullCallback() { 47: 48: @Override 49: public void onSuccess(PullResult pullResult) { 50: responseHeader.setMaxOffset(pullResult.getMaxOffset()); 51: responseHeader.setMinOffset(pullResult.getMinOffset()); 52: responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset()); 53: response.setRemark(null); 54: 55: switch (pullResult.getPullStatus()) { 56: case FOUND: 57: response.setCode(ResponseCode.SUCCESS); 58: 59: List<MessageExt> msgListOK = new ArrayList<MessageExt>(); 60: try {61: for (MessageExt MSG: pullResult. GetMsgFoundList ()) {62: / / use the filter class news 63: boolean match = findFilterClass.getMessageFilter().match(msg, filterContext); 64: if (match) { 65: msgListOK.add(msg); 66: } 67: } 68: 69: if (! msgListOK.isEmpty()) { 70: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK); 71: return; 72: } else { 73: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); 74: } 75: } catch (Throwable e) { 76: final String error = 77: String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ", 78: requestHeader.getConsumerGroup(), requestHeader.getTopic()); 79: log.error(error, e); 80: 81: response.setCode(ResponseCode.SYSTEM_ERROR); 82: response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e)); 83: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); 84: return; 85: } 86: 87: break; 88: case NO_MATCHED_MSG: 89: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); 90: break; 91: case NO_NEW_MSG: 92: response.setCode(ResponseCode.PULL_NOT_FOUND); 93: break; 94: case OFFSET_ILLEGAL: 95: response.setCode(ResponseCode.PULL_OFFSET_MOVED); 96: break; 97: default: 98: break; 99: } 100: 101: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); 102: } 103: 104: @Override 105: public void onException(Throwable e) { 106: response.setCode(ResponseCode.SYSTEM_ERROR); 107: response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); 108: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); 109: return; 110:} 111:}; : 112-113/114 / pull news: pullConsumer. PullBlockIfNotFound (mq, null, offset, maxNums, pullCallback); 115: return null; 116:}Copy the code