This is the 22nd day of my participation in the August Wen Challenge.More challenges in August

Continue the previous chapter with RocketMQ message samples and the integration of RocketMQ with various frameworks

Filter messages

Tag is a message attribute unique to RocketMQ. In RocketMQ, an application can use a Topic, and different businesses within a Topic can be distinguished using tags. There are only two latitudes in terms of latitude, and there are only two levels of this, so in some complex, multi-dimensional scenarios, this filter is not enough.

RocketMQ provides support for SQL, which allows you to filter messages using SQL expressions. Supported SQL statements are executed according to the SQL92 standard. The parameters that can be used in SQL are the default TAGS and a producer attribute. Examples are as follows:

Producer

public class SqlFilterProducer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr(Constant.NAMESERVER);
        producer.start();

        String[] tags = new String[] {"TagA"."TagB"."TagC"};
        for(int i=0; i<15; i++){
            Message msg = new Message("SqlFiltertest", tags[i % tags.length], ("hello filter sql").
                    getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult); } producer.shutdown(); }}Copy the code

Consumer

public class SqlFilterConsumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFiltertest", MessageSelector.bySql("TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Recive New Message: %s %n", Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.setNamesrvAddr(Constant.NAMESERVER); consumer.start(); System.out.println("Consumer Started. %n"); }}Copy the code

The test results are as follows:

Consumer Started. %n
ConsumeMessageThread_1 Recive New Message: [MessageExt [brokerName=RaftNode00, queueId=0, storeSize=230, queueOffset=12, sysFlag=0, bornTimestamp=1629522139197, BornHost = / 192.168.40.1:3424, storeTimestamp = 1629522139489, storeHost = / 192.168.40.129:30921, msgId=C0A82881000078C900000000011ED473, commitLogOffset=18797683, bodyCRC=1438617675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SqlFiltertest', flag=0, properties={MIN_OFFSET=0, a=0, MAX_OFFSET=13, CONSUME_START_TIME=1629522139362, UNIQ_KEY=24098A0031A928409DD7720C18F134F24A6818B4AAC269CB6C3B0000, CLUSTER=RaftCluster, WAIT=true, TAGS=TagA}, body=[104, 101, 108, 108, 111, 32, 102, 105, 108, 116, 101, 114, 32, 115, 113, 108], transactionId='null'}]] 
ConsumeMessageThread_2 Recive New Message: [MessageExt [brokerName=RaftNode00, queueId=1, storeSize=230, queueOffset=12, sysFlag=0, bornTimestamp=1629522139259, BornHost = / 192.168.40.1:3424, storeTimestamp = 1629522139551, storeHost = / 192.168.40.129:30921, msgId=C0A82881000078C900000000011ED589, commitLogOffset=18797961, bodyCRC=1438617675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SqlFiltertest', flag=0, properties={MIN_OFFSET=0, a=1, MAX_OFFSET=13, CONSUME_START_TIME=1629522139362, UNIQ_KEY=24098A0031A928409DD7720C18F134F24A6818B4AAC269CB6C7B0001, CLUSTER=RaftCluster, WAIT=true, TAGS=TagB}, body=[104, 101, 108, 108, 111, 32, 102, 105, 108, 116, 101, 114, 32, 115, 113, 108], transactionId='null'}]] 
ConsumeMessageThread_3 Recive New Message: [MessageExt [brokerName=RaftNode00, queueId=3, storeSize=230, queueOffset=10, sysFlag=0, bornTimestamp=1629522139315, BornHost = / 192.168.40.1:3424, storeTimestamp = 1629522139614, storeHost = / 192.168.40.129:30921, msgId=C0A82881000078C900000000011ED7B5, commitLogOffset=18798517, bodyCRC=1438617675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SqlFiltertest', flag=0, properties={MIN_OFFSET=0, a=3, MAX_OFFSET=11, CONSUME_START_TIME=1629522139371, UNIQ_KEY=24098A0031A928409DD7720C18F134F24A6818B4AAC269CB6CB30003, CLUSTER=RaftCluster, WAIT=true, TAGS=TagA}, body=[104, 101, 108, 108, 111, 32, 102, 105, 108, 116, 101, 114, 32, 115, 113, 108], transactionId='null'}]] 
ConsumeMessageThread_4 Recive New Message: [MessageExt [brokerName=RaftNode00, queueId=0, storeSize=230, queueOffset=13, sysFlag=0, bornTimestamp=1629522139368, BornHost = / 192.168.40.1:3424, storeTimestamp = 1629522139657, storeHost = / 192.168.40.129:30921, msgId=C0A82881000078C900000000011ED8CB, commitLogOffset=18798795, bodyCRC=1438617675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SqlFiltertest', flag=0, properties={MIN_OFFSET=0, a=4, MAX_OFFSET=14, CONSUME_START_TIME=1629522139407, UNIQ_KEY=24098A0031A928409DD7720C18F134F24A6818B4AAC269CB6CE80004, CLUSTER=RaftCluster, WAIT=true, TAGS=TagB}, body=[104, 101, 108, 108, 111, 32, 102, 105, 108, 116, 101, 114, 32, 115, 113, 108], transactionId='null'}]] 
ConsumeMessageThread_5 Recive New Message: [MessageExt [brokerName=RaftNode00, queueId=2, storeSize=230, queueOffset=12, sysFlag=0, bornTimestamp=1629522139434, BornHost = / 192.168.40.1:3424, storeTimestamp = 1629522139731, storeHost = / 192.168.40.129:30921, msgId=C0A82881000078C900000000011EDAF7, commitLogOffset=18799351, bodyCRC=1438617675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SqlFiltertest', flag=0, properties={MIN_OFFSET=0, a=6, MAX_OFFSET=13, CONSUME_START_TIME=1629522139763, UNIQ_KEY=24098A0031A928409DD7720C18F134F24A6818B4AAC269CB6D2A0006, CLUSTER=RaftCluster, WAIT=true, TAGS=TagA}, body=[104, 101, 108, 108, 111, 32, 102, 105, 108, 116, 101, 114, 32, 115, 113, 108], transactionId='null'}]] 
ConsumeMessageThread_6 Recive New Message: [MessageExt [brokerName=RaftNode00, queueId=3, storeSize=230, queueOffset=11, sysFlag=0, bornTimestamp=1629522139470, BornHost = / 192.168.40.1:3424, storeTimestamp = 1629522139760, storeHost = / 192.168.40.129:30921, msgId=C0A82881000078C900000000011EDC0D, commitLogOffset=18799629, bodyCRC=1438617675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='SqlFiltertest', flag=0, properties={MIN_OFFSET=0, a=7, MAX_OFFSET=12, CONSUME_START_TIME=1629522139765, UNIQ_KEY=24098A0031A928409DD7720C18F134F24A6818B4AAC269CB6D4D0007, CLUSTER=RaftCluster, WAIT=true, TAGS=TagB}, body=[104, 101, 108, 108, 111, 32, 102, 105, 108, 116, 101, 114, 32, 115, 113, 108], transactionId='null'}]] 
Copy the code

It is clear from the results that only TagA and TagB messages are consumed. The purpose of message filtering is achieved. But it should also be remembered that everything comes at a price.

Message filtering is received on the consumer side, which means that unwanted messages are received from the producer side, requiring additional “overhead”.

Transaction message

Transactional messaging is an important feature of RocketMQ, so it’s important to understand the art of code.

There are many other solutions to the distributed transaction problem in the industry: two-phase commit, SAGA algorithm, transaction compensation (TCC), etc. RocketMQ’s transactional message is a two-phase commit (2PC) based on middleware emulation, which is one type of message-oriented middleware. The general idea is shown here :(from distributed messaging middleware)

System A sends A preliminary message to the message middleware, and the message middleware sends A confirmation message to system A after saving the good news. System A executes local things; System A sends A submission message to the message middleware according to the execution results of local things for secondary confirmation. After receiving the commit message, the messaging middleware marks the prepared message as deliverable, and the subscription receives the message.Copy the code

The above four steps are the basic process of a transaction message processing, and each of these steps may cause problems in the actual environment. Specific analysis is as follows:

If the first step fails and the whole thing fails, the local thing of system A is not executed; The second step fails, the whole thing fails, and the local thing behind system A is not executed; If the third step fails, system A will implement A message back lookup interface. When the MQ server does not get feedback from System A, it will poll the message back lookup interface to check the local transaction execution result of system A. If the transaction execution fails, it will roll back the preparatory message sent in step 1. In the fourth step, the local transaction of system A has been successfully committed, and the MQ server can detect the successful execution of the transaction through the back lookup interface. Then the MQ server marks the prepared message as deliverable, thus completing the processing of the entire message transactionCopy the code

Through this transaction messaging mechanism, distributed things are divided into one message thing (local thing of system A + sending messages)+ local thing of system B, where the operation of system B is message-driven. Once the message is subscribed, the local transaction of system A has been committed successfully, at which point B receives the message to execute the local transaction. If the local transaction of system B fails, MQ will redeliver the message until the local transaction of system B has been committed successfully. In this way, the distributed processing of system A and system B is realized. See below:

The RocketMQ website says something like this: operation atomicity, that is, if any step in the process of sending/receiving a message fails, the operation fails. There are only two results: SUCCESS OR FAILURE.

The key to transaction messages is that a TransactionListener specified in TransactionMQProducer acts as a controller for transaction messages. The cases are as follows:

From the VIp02-RocketMQ Developer Model, Alan’s PPT, he did not use the source code example, his own implementation of a relatively straightforward things listener. As follows:

public class TransactionListenerImpl implements TransactionListener {
    // Execute after submitting the play thing message
    // Messages returning the COMMIT_MESAGE status are immediately consumed by the consumer
    // Messages that return the ROLLBACK_MESSAGE status are discarded
    // Messages that return an unknown state are checked by the Broker over a period of time
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        // TagA messages are immediately consumed by consumers
        if(StringUtils.contains(tags, "TagA")) {return LocalTransactionState.COMMIT_MESSAGE;
            // The TagB message is discarded
        }else if(StringUtils.contains(tags, "TagB")) {return LocalTransactionState.ROLLBACK_MESSAGE;
            // Other messages wait for the Broker to perform a transaction state check
        }else{
            returnLocalTransactionState.UNKNOW; }}// Perform a status check on an unknown message. The result is the same.
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags();
        // TagC messages are consumed by consumers over time
        if(StringUtils.contains(tags, "TagC")) {return LocalTransactionState.COMMIT_MESSAGE;
            // TagD messages are also discarded during status lookup
        }else if(StringUtils.contains(tags, "TagD")) {return LocalTransactionState.ROLLBACK_MESSAGE;
            // The remaining TagE messages are discarded after multiple status checks
        }else{
            returnLocalTransactionState.UNKNOW; }}}Copy the code

Consumer realization

public class TransactionListenerImpl implements TransactionListener {
    // Execute after committing the play transaction message
    // Messages returning the COMMIT_MESAGE status are immediately consumed by the consumer
    // Messages that return the ROLLBACK_MESSAGE status are discarded
    // Messages that return an unknown state are checked by the Broker over a period of time
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        // TagA messages are immediately consumed by consumers
        if(StringUtils.contains(tags, "TagA")) {return LocalTransactionState.COMMIT_MESSAGE;
            // The TagB message is discarded
        }else if(StringUtils.contains(tags, "TagB")) {return LocalTransactionState.ROLLBACK_MESSAGE;
            // Other messages wait for the Broker to perform a transaction status check
        }else{
            returnLocalTransactionState.UNKNOW; }}// Perform a status check on an unknown message. The result is the same.
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags();
        // TagC messages are consumed by consumers over time
        if(StringUtils.contains(tags, "TagC")) {return LocalTransactionState.COMMIT_MESSAGE;
            // TagD messages are also discarded during status lookup
        }else if(StringUtils.contains(tags, "TagD")) {return LocalTransactionState.ROLLBACK_MESSAGE;
            // The remaining TagE messages are discarded after multiple status checks
        }else{
            returnLocalTransactionState.UNKNOW; }}}Copy the code

Rules for transaction messages

To use transactional messages, we first need to understand its rules:

Message transactions do not support delayed and batch messages;

To avoid half-queue messages accumulating when a single message is checked too many times, the number of checks can be set to 15, according to Turing’s PDF course. This is configurable and can be modified by the transactionCheckMax parameter in the Broker configuration file. If a message has been checked more than N times (N = transactionCheckMax), the Broker discards the message and prints an error log.

If you don’t want to simply print log, but can be by rewriting AbstractTransactionCheckListener class to modify this behavior.

The transaction message sets how often the message is reviewed in the Broker configuration file, specified with the transactionMsgTimeout parameter. When a transaction message is sent, we can change this limit by setting the user property CHECK_IMMUNITY_TIME_IN_SECONDS, which takes precedence over the transactionMsgTimeout parameter.

Transaction messages may be checked or consumed more than once

The target topic message submitted to the user may fail, currently depending on logging. Its high availability is guaranteed by RocketMQ itself. If you need to ensure that transaction messages are not lost and transaction integrity is guaranteed, the synchronous dual write mechanism is recommended.

The producer ID of a transaction message cannot be shared with the producer ID of another type of message. Unlike other types of messages, transaction messages allow reverse lookup, where MQ servers query to consumers through their producer ID.

Transaction consumption process

So how are transaction messages implemented? See the picture below (from Alan’s VIP02RocketMQ development model) :

To get an idea of what is actually happening, the key is that when a message is sent, it is converted into a half and half message and stored in an RMQ_SYS_TRANS_HALF_TOPIC within RocketMQ, which is not visible to the consumer. After a series of transaction checks, the message is forwarded to the target Topic, at which point the consumer can sense and consume.

See interaction flow chart from Distributed Messaging Middleware In Action: Pretty good too

Refer to Distributed Messaging Middleware Combat for process explanation:

The transaction initiator first sends a prepare message to the MQ Server. The MQ Server sends an ACK message to the transaction initiator. If the result is ROLLBACK, MQ will delete the prepare message and do not send it. If the result is ROLLBACK, MQ will send the message to the consumer. If the local transaction fails or times out during execution, the second confirmation message submitted in step 4 will not reach the MQ Server. Therefore, THE MQ Server will check the message back after a period of time, polling other producers continuously to obtain the status. After receiving the callback message, the sender queries the local transaction execution result of the corresponding message. 'The sender submits the second confirmation according to the final execution result of the local transaction obtained by the local callback. The consumption success mechanism on the consumer side is guaranteed by MQ;Copy the code

ACL Permission Control

Acls provide topic-level user access control for RocketMQ. They can inject AccesssKey and SecretKey signatures on the Client using RPCCHook. The corresponding permission control properties (including Topic access rights, IP whitelist, AccesssKey, SecretKey signature, etc.) can be set in the RocketMQ decompression file /conf/plain_acl.yml. The Broker then checks all permissions on the AccessKey and throws exceptions if they fail.

Maven rely on

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.7.1</version>
</dependency>
Copy the code

RocketMQ Configuration Details In RocketMQ brock. conf, set the ACL flag to aclEnable=true. Permissions can then be configured using plain_acl.yml. Like Apollo, this file is hot loaded, meaning that you do not need to restart the Broker when modifying the configuration file. The following is a parse of the file

# Global whitelist, not controlled by ACL
It is usually necessary to add all the nodes in the master/slave schema
globalWhiteRemoteAddresses:
    - 10.10103.. *
    - 192.168. 0. *
accounts:
# First account
    - accessKey: RocketMQ
    secretKey: 12345678
    whiteRemoteAddress:
    admin: false
    defaultTopicPerm: DENY The default Topic access policy is deny
    defaultGroupPerm: SUB The default Group access policy is subscription only
    topicPerms:
    - topicA=DENY # topicA refused to
    - topicB=PUB|SUB #topicB allows you to publish and subscribe messages
    - topicC=SUB #topicC subscription only
    groupPerms:
    # the group should convert to retry topic
    - groupA=DENY
    - groupB=PUB|SUB
    - groupC=SUB
    The second account, as long as the IP from 192.168.1.*, can access all resources
    - accessKey: rocketmq2
    secretKey: 12345678
    whiteRemoteAddress: 192.1681.. *
    # if it is admin, it could access all resources
    admin: true
Copy the code