Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .

In RocketMQ, messages are filtered in two ways:

  • tag
  • sql92

This article looks at some of the details of message filtering from a source code perspective.

1. Prepare the demo

Message filter sample demo in org. Apache. Rocketmq. Example. The filter bag, here we look at the tag and SQL filtering methods respectively.

1.1 Message Filteringproducer

public class FilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer 
            = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        String[] tags = new String[] {"TagA"."TagB"."TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest".// Specify the tag of the message
                tags[i % tags.length],
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult); } producer.shutdown(); }}Copy the code

In producer, we simply specify the tag of the message and call Send (…). Method to send the message.

In terms of message filtering, the producer just sends it as a normal message and does nothing extra.

1.2 Message Filteringconsumer

1. The tag to filter

The following is an example of a consumer for tag filtering:

public class TagFilterConsumer {
    public static void main(String[] args) throws 
            InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer 
            = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.subscribe("TagFilterTest"./ / set to filter the tag, multiple use | | to separate
            "TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                System.out.printf("%s Receive New Messages: %s %n", 
                    Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code

When use, you need to specify the filter tag, multiple tag using | | to separate.

2. SQL filter

An example of a consumer for SQL filtering is as follows:

public class SqlFilterConsumer {

    public static void main(String[] args) throws Exception {
        String nameServer = "localhost:9876";
        DefaultMQPushConsumer consumer 
            = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe("SqlFilterTest".// SQL filter statement
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                System.out.printf("%s Receive New Messages: %s %n", 
                    Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code

Unlike tag filtering, SQL filtering uses messageselector.bysQL (…). Specify the SQL statement.

In addition, in order for the broker to support SQL filtering, you need to set the property enablePropertyFilter=true so that the broker can support SQL filtering.

From the above code, the consumer specifies the filter rules that tell the broker which messages it can receive, and the broker returns the corresponding messages.

From 2.brokerTo get the message

When a consumer pulls a message from the broker, it reports its own filtering rules. When the broker receives a message from a consumer, it returns the corresponding message to the consumer. The broker gets the message by PullMessageProcessor#processRequest(…) :

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, 
        boolean brokerAllowSuspend) throws RemotingCommandException {...// Create a filter for message filtering
    SubscriptionData subscriptionData = null;
    ConsumerFilterData consumerFilterData = null;
    if (hasSubscriptionFlag) {
        try {
            // Build filter data
            subscriptionData = FilterAPI.build(requestHeader.getTopic(), 
                requestHeader.getSubscription(), requestHeader.getExpressionType());
            // Create the consumerFilterData object if it is not a tag filter
            if(! ExpressionType.isTagType(subscriptionData.getExpressionType())) { consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());assertconsumerFilterData ! =null; }}catch(Exception e) { ... }}else{... }...// Message filtering object
    MessageFilter messageFilter;
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    } else {
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }

    // Get the message
    Obtain the ConsumerQueue file according to topic and queueId
    // 2. Retrieve the message content from the CommitLog according to the information in the ConsumerQueue file
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage( requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter); .return response;
}
Copy the code

This method is the core method by which the consumer pulls messages from the broker, but we are focusing only on the message filtering operations here, so we save a lot of code and keep only the message filtering content.

Message filtering is as follows:

  1. buildsubscriptionData
  2. buildConsumerFilterData: If nottagType filter, createconsumerFilterDataobject
  3. Create a message filtering objectMessageFilter
  4. Gets the message, which is filtered as followsDefaultMessageStore#getMessage

Let’s take a look at each of these steps.

2.1 buildsubscriptionData:FilterAPI#build

Build subscriptionData with FilterAPI#build:

public static SubscriptionData build(final String topic, final String subString,
    final String type) throws Exception {
    // Build the filter data of the tag type
    if (ExpressionType.TAG.equals(type) || type == null) {
        return buildSubscriptionData(null, topic, subString);
    }

    if (subString == null || subString.length() < 1) {
        throw new IllegalArgumentException("Expression can't be null! " + type);
    }
    // Build filtered data of SQL type
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);
    subscriptionData.setExpressionType(type);
    return subscriptionData;
}

/** * Build tag filter message */
public static SubscriptionData buildSubscriptionData(final String consumerGroup, 
        String topic, String subString) throws Exception {
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);

    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) 
            || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        / / if the specified tag, press | | splitting the tag
        String[] tags = subString.split("\ \ | \ \ |");
        if (tags.length > 0) {
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        // Tag goes into tagsSet, tag hashCode goes into codeSetsubscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); }}}}else {
            throw new Exception("subString split error"); }}return subscriptionData;
}
Copy the code

From the above method, different subscriptionData will be constructed according to tag and non-tag filtering when subscriptionData is constructed:

  1. If it istagFilter, then press “| |” split the specifiedtagTo get thetagIn thetagsSet,tagHash valueIn thecodeSetIn the
  2. If you aretagFilter, do not handletagRelated operations, set other properties

2.2 buildConsumerFilterData:ConsumerFilterManager#build

For non-tag filtered types, rocketMq builds an additional ConsumerFilterData object using the method ConsumerFilterManager#build:

public static ConsumerFilterData build(final String topic, final String consumerGroup,
        final String expression, final String type,
        final long clientVersion) {
        if (ExpressionType.isTagType(type)) {
            return null;
        }

        ConsumerFilterData consumerFilterData = new ConsumerFilterData();
        // Set a series of properties
        consumerFilterData.setTopic(topic);
        consumerFilterData.setConsumerGroup(consumerGroup);
        consumerFilterData.setBornTime(System.currentTimeMillis());
        consumerFilterData.setDeadTime(0);
        consumerFilterData.setExpression(expression);
        consumerFilterData.setExpressionType(type);
        consumerFilterData.setClientVersion(clientVersion);
        try {
            // Sets the filter for processing expressions
            consumerFilterData.setCompiledExpression(
                FilterFactory.INSTANCE.get(type).compile(expression)
            );
        } catch(Throwable e) { log.error(...) ;return null;
        }

        return consumerFilterData;
    }
Copy the code

In this method, the key is the following line:

consumerFilterData.setCompiledExpression(
    FilterFactory.INSTANCE.get(type).compile(expression)
);
Copy the code

It sets the parser for the expression, and the FilterFactory code looks like this:

public class FilterFactory {

    /** Singleton */
    public static final FilterFactory INSTANCE = new FilterFactory();

    /** Store the filter map */
    protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER 
        = new HashMap<String, FilterSpi>(4);

    static {
        FilterFactory.INSTANCE.register(new SqlFilter());
    }

    /** * Add filter to FILTER_SPI_HOLDER */
    public void register(FilterSpi filterSpi) {
        if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
            throw newIllegalArgumentException(...) ; } FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi); }/** * Get filter */ by type
    public FilterSpi get(String type) {
        returnFILTER_SPI_HOLDER.get(type); }... }Copy the code

As you can see, the entire FILTER_SPI_HOLDERy has only one instance of FilterSpi: SqlFilter, which also handles SQL filtering.

2.3 createMessageFilterobject

This creates a MessageFilter object, into which the subscriptionData and consumerFilterData created above are passed.

2.4 Obtaining Messages

CommitLog: DefaultMessageStore#getMessage: commitLog: DefaultMessageStore#getMessage

public GetMessageResult getMessage(final String group, final String topic, final int queueId, 
        final long offset, final int maxMsgNums, final MessageFilter messageFilter) {...// Check whether the message meets the filtering criteria
    if(messageFilter ! =null && !messageFilter.isMatchedByConsumeQueue(
                isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        continue;
    }

    // Get the message
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    if (null == selectResult) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.MESSAGE_WAS_REMOVING;
        }

        nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
        continue;
    }

    if(messageFilter ! =null
        // Compare SQL expressions
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        // release...
        selectResult.release();
        continue; }... }Copy the code

In this method, we still focus on the filtering process. This method does the following:

  1. Check whether messages meet filtering conditionstagthehashCodeMessages that do not meet the conditions will not be retrieved
  2. To get information fromcommitlogGet messages from files
  3. Check whether the message meets the filtering criteriasqlType. Messages that do not meet the criteria are not returned
1. The filtertagthehashCode:

The broker handles the tag DefaultMessageFilter#isMatchedByConsumeQueue as follows:

@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    if (null == tagsCode || null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode()) {
        return true;
    }

    return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
            // Check whether the hashcode of the tag is satisfied
            || subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
Copy the code

Note that only the hashcodes of the tags are equal, but the hashcodes of different tags can be equal. The actual tag filtering is done in the consumer.

2. SQL filter

After the message is retrieved from the commitlog, SQL filtering is performed using ExpressionMessageFilter#isMatchedByCommitLog:

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
    // Omit something. Object ret =null;
    try {
        MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
        / / processing
        ret = realFilterData.getCompiledExpression().evaluate(context);
    } catch (Throwable e) {
        log.error("Message Filter error, " + realFilterData + "," + tempProperties, e);
    }

    log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

    if (ret == null| |! (retinstanceof Boolean)) {
        return false;
    }

    return (Boolean) ret;
}
Copy the code

RealFilterData contains the following contents:

3. consumerThe filtertag

For the tag to filter, the broker is only according to the tag hashCode filtering, the consumer will be according to the tag content filtering, we enter the pull method of message DefaultMQPushConsumerImpl# pullMessage:

   public void pullMessage(final PullRequest pullRequest) {...// The message pull callback is processed in this method after the message is pulled
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if(pullResult ! =null) {
                    // Process messages, decode binary messages into Java objects, and also tag filter messages
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult( pullRequest.getMessageQueue(), pullResult, subscriptionData); . }... }... }}Copy the code

According to the following PullAPIWrapper#processPullResult method:

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
    final SubscriptionData subscriptionData) {
    PullResultExt pullResultExt = (PullResultExt) pullResult;

    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        // Decode binary data into objects
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

        List<MessageExt> msgListFilterAgain = msgList;
        // Filter by tag
        if(! subscriptionData.getTagsSet().isEmpty() && ! subscriptionData.isClassFilterMode()) { msgListFilterAgain =new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if(msg.getTags() ! =null) {
                    // Filter messages by tag
                    if(subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); }}}}... }... }Copy the code

From the code, the method determines whether the message should be added to msgListFilterAgain based on whether the tag is in the TagsSet. MsgListFilterAgain is the list of filtered messages.

3. Summary

RocketMq message filtering supports tag and SQL.

1. tagway

As the broker gets the message, a wave of messages is filtered based on the hashCode of the tag, but the resulting message may not be just for the specified tag, so further filtering is required on the consumer.

For example, if a consumer subscribs to a message with a tag of tag1, both tag1 and TAG11 have hashcodes of 100. Therefore, when filtering through the broker, both messages are sent to the consumer according to the tag’s hashCode. So the consumer needs to reevaluate the tag values to filter out the messages it really needs.

2. sqlway

Sql-style filtering that occurs only within the broker.


Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.

This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!