Quick learning

This quick start manual describes how to set up the RocketMQ messaging system on a local computer and how to do basic production and consumption details.

Necessary environment

  1. 64-bit operating system
  2. A 64 – bit JDK1.8 +
  3. Maven 3.2.x +
  4. Git
  5. More than 4G of storage is used for proxy instances

Download and build the release

The source code for the official 4.4.0 version can be downloaded here. You can also download a binary version here

Now execute the following commands to unpack the 4.4.0 source and build the binary package components.

> unzip rocketmq - all - 4.4.0 - source - the zip >cdRocketmq-all-4.4.0 / > mvn-prelease -all-dskiptests clean install -u >cd distribution/target/apache-rocketmq
Copy the code

The mvn-prelease -all-dskiptests clean install -u command will build the downloaded source package and then produce the target directory in the distribution directory, where the built package is located.

Start the Name Server

charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distribution/targe T/apache-RocketMQ $nohup sh Bin/mqnamesrv-& [1] 5273 charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distribution/targe T/apache-RocketMQ $nohup: ignores input and appends output to'nohup.out'
tail -f~/logs/rocketmqlogs/namesrv.log 2019-03-31 19:24:10 INFO main - tls.client.keyPath = null 2019-03-31 19:24:10 INFO main - tls.client.keyPassword = null 2019-03-31 19:24:10 INFO main - tls.client.certPath = null 2019-03-31 19:24:10 INFO main  - tls.client.authServer =false
2019-03-31 19:24:10 INFO main - tls.client.trustCertPath = null
2019-03-31 19:24:11 INFO main - Using OpenSSL provider
2019-03-31 19:24:11 INFO main - SSLContext created forserver 2019-03-31 19:24:12 INFO NettyEventExecutor - NettyEventExecutor service started 2019-03-31 19:24:12 INFO FileWatchService - FileWatchService service started 2019-03-31 19:24:12 INFO main - The Name Server boot success. serializeType=JSON 2019-03-31 19:25:11 INFO NSScheduledThread1 - -------------------------------------------------------- 2019-03-31 19:25:11 INFO NSScheduledThread1 - configTable SIZE:  0Copy the code

The Name Server boot success can be seen by viewing The Name Server boot success, and The serialization method is JSON.

Start the Broker Server

charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distr ibution/target/ apache-RocketMQ $nohup sh Bin /mqbroker -n localhost:9876 & [1] 5784 nohUP: ignores input and appends output to'nohup.out'charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distr ibution/target/ apache-RocketMQ $tail-f ~/logs/rocketmqlogs/broker.log 
2019-03-31 19:41:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:41:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 19:41:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:42:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 19:42:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK

Copy the code

You can see that the broker has successfully registered with the Name Server

In addition, you can run the JPS command to check whether the service is started successfully

charse@charse-thinkpad:~$ jps
12128 Main
12549 Jps
5279 NamesrvStartup
5791 BrokerStartup
Copy the code

As you can see, both the NameServer and the Broker have been started successfully, so we can proceed to the next step of simulating the sender and consumer.

Broker

When programmatically producing messages to a topic(TopicTest), the Broker discovers that the topic is not available. The Broker creates the topic(TopicTest) by default and configits default configuration. What is displayed in producer is my local LAN address (192.168.3.16).

2019-03-31 20:37:43 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: SendMessageThread_1 - Create new Topic by default Topic :[TBW102] SendMessageThread_1 - Create new Topic by default Topic :[TBW102] config:[TopicConfig [topicName=TopicTest,readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] producer:[192.168.3.16:47538] 2019-03-31 20:37:43 INFO BrokerOutAPI_THREAD_4-Register Broker to Name Server localhost:9876 OK 2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842] 2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: producer1 channel: ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842] 2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer[producer1] from  groupChannelTable ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863927] 2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer group[producer1] from groupChannelTable 2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863933] 2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable 2019-03-31 20:37:52 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK 2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - dispatch behind commitlog 0 bytes
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
Copy the code

When you create a producer, you can see that a CLIENT_INNER_PRODUCER is created in the groupChannelTable and then a producer is created for the client Producer1 is registered in the groupChannelTable, where you can see some information about the producer. But when Produer turns off shutdown. Close producer1 on the client and remove groupChannelTable from CLIENT_INNER_PRODUCER. Agents register with Nameserver from time to time.

When the client creates a consumer, as shown in the log output by the agent in the following figure, it can be seen that when there is a consumer, a subscription group will be created, and the configuration information of the subscription group will be created. Then after the connection of the new consumer, topI will be added to the corresponding group, including the topic(topic) to which you subscribe. A RETRY Topicz topic named after the %RETRY% consumer group is added, and an up subscription is added. It also creates a new producer that is CLIENT_INNER_PRODUCER

2019-03-31 20:38:22 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - auto create a subscription group, SubscriptionGroupConfig [groupName=consumer1, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - create new topic TopicConfig [topicName=%RETRY%consumer1, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2019-03-31 20:38:35 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915211]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, [(expressionType=TAG)]] [(type =TAG)]] CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x1C14007d, L:/192.168.3.16:10911 -r :/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915412] 2019-03-31 20:38:35 INFO HeartbeatThread_3 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915833, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915843, expressionType=TAG]
2019-03-31 20:38:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 20:39:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:39:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:40:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
Copy the code

NameServer

As you can see from the log output in Namesever when no topic is created, Roacket MQ has many topics created by default

2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: ChannelRegistered 127.0.0.1:48210 2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: ChannelActive, the channel[127.0.0.1:48210] 2019-03-31 19:39:41 INFO Remotingthread_1 - New Topic Registered, charse-thinkpad QueueData [brokerName=charse-thinkpad,readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, BenchmarkTest QueueData [brokerName=charse-thinkpad, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, TBW102 QueueData [brokerName=charse-thinkpad, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, DefaultCluster QueueData [brokerName=charse-thinkpad, readQueueNums=16, writeQueueNums=16, perm=7, TopicSynFlag =0 INFO RemotingExecutorThread_1 - New Broker Registered, 192.168.3.16:10911 HAServer: RemotingExecutorThread_4 - New Topic Registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=charse-thinkpad,readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]

Copy the code

When creating a topic(TopicTest) on the client, you can see that the topic has been registered in nameserver.

2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: ChannelRegistered 127.0.0.1:49208 2019-03-31 20:37:42 INFO NettyServerCodecThread_2-Netty SERVER PIPELINE: ChannelActive, the channel[127.0.0.1:49208] 2019-03-31 20:37:43 INFO RemotingExecutorThread_4 - New Topic Registered, TopicTest QueueData [brokerName=charse-thinkpad,readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0] 2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelInactive, The channel[127.0.0.1:49208] 2019-03-31 20:37:43 INFO nettyServerCodecthread_2-netty SERVER channelUnregistered, [127.0.0.1:49208] 2019-03-31 20:38:34 INFO nettyServerCodecthread_3-netty SERVER PIPELINE [127.0.0.1:49208] 2019-03-31 20:38:34 INFO nettyServerCodecthread_3-netty SERVER PIPELINE ChannelRegistered 127.0.0.1:49224 2019-03-31 20:38:34 INFO NettyServerCodecThread_3-Netty SERVER PIPELINE: ChannelActive, the channel[127.0.0.1:49224] 2019-03-31 20:38:35 INFO RemotingExecutorThread_4 - New Topic Registered, %RETRY%consumer1 QueueData [brokerName=charse-thinkpad,readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
Copy the code

When the consumer is closed, the client consumer Consumer1 is unregistered from consumerGroupInfo. Then unregister CLIENT_INNER_PRODUCER from the groupChannelTable.

2019-03-31 21:50:08 INFO HeartbeatThread_3 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, [(expressionType=TAG)]] [(type =TAG)]] CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x7E95D6ee, L:/192.168.3.16:10911 -r :/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700] 2019-03-31 21:50:08 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040208756, expressionType=TAG] 2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister a consumer[consumer1] from consumerGroupInfo ClientChannelInfo [channel=[id: 0x7E95D6ee, L:/192.168.3.16:10911 -r :/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208763] 2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister consumer ok, no any connection, and remove consumer group, consumer1 2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x7E95D6ee, L:/192.168.3.16:10911 -r :/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208801] 2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable 2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - dispatch  behind commitlog 0 bytes
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 21:50:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 21:50:28 WARN PullMessageThread_2 - the consumer's group info not exist, group: ERROR NettySerVernioSelecTOR_33-ProcessRequestWrapper Response to /192.168.3.16:49810 failed java.nio.channels.ClosedChannelException: null at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...) (Unknown Source) ~ [netty - all - 4.0.42. Final. Jar: 4.0.42. The Final] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=11, language=JAVA, version=293, opaque=24, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=2, suspendTimeoutMillis=15000, commitOffset=0, topic=%RETRY%consumer1, queueOffset=0, expressionType=TAG, subVersion=1554040208756, consumerGroup=consumer1}, serializeTypeCurrentRPC=JSON] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=24, language=JAVA, version=293, opaque=24, flag(B)=1, remark=the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details., extFields=null, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 21:51:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
Copy the code

Production and consumption example

  • There are three ways to use RocketMQ for sending purchases: reliable synchronization; Reliable asynchronous mode; One-way delivery.
  • Consume messages using RocketMQ

1. Add dependencies

Maven:

< the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.3.0 < / version > </dependency>Copy the code

2.1 Synchronous sending consumption

Reliable synchronous transmission is widely used in important notification messages, SMS notification, SMS marketing system and other scenarios.

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.producer.shutdown(); }}Copy the code

2.2 Sending Messages Asynchronously

Asynchronous transports are typically used in response time sensitive business scenarios.

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
                final int index = i;
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); }}); }//Shut down once the producer instance is not longer in use.producer.shutdown(); }}Copy the code

2.3 Unidirectional Transmission

One-way transport is used in situations that require moderate reliability, such as log collection.

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);

        }
        //Shut down once the producer instance is not longer in use.producer.shutdown(); }}Copy the code

3. Sample consumption message

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest"."*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});//Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n"); }}Copy the code

More examples of rocketMQ use can be found here.

The order message

RocketMQ provides a first-in, first-out sequential message queue. The sending/receiving of global and partially ordered messages is shown in the following example.

Sending message Example

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdownproducer.shutdown(); }}Copy the code

Subscribe message example

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest"."TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List
       
         msgs, ConsumeOrderlyContext context)
        {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) = =0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) = =0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) = =0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) = =0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code

News broadcast

Broadcasting is sending messages to all subscribers of the master, and is a good choice if you want all subscribers to receive messages on a topic.

Production example

ublic class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult); } producer.shutdown(); }}Copy the code

Examples of consumption

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest"."TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }}Copy the code

Timing of the message

Timed messages are different from regular messages because they are delivered after a set delay.

1. Start the consumer to wait for incoming subscription messages

public class ScheduledMessageConsumer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate message consumer
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
         // Subscribe topics
         consumer.subscribe("TestTopic"."*");
         // Register message listener
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + message.getMsgId() + "]"
                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                 }
                 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Launch consumerconsumer.start(); }}Copy the code

2. Send a scheduled message

public class ScheduledMessageProducer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message "+ i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown(); }}Copy the code

MessageDelayLevel =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Where level=0 indicates no delay and level=1 indicates 1 delay. Level =2 indicates level 2 delay, and so on. As you can observe, the message will be consumed 10 seconds after it was stored.

Batch send

Why bulk delivery?

Sending messages in batches can improve the transmission performance of small messages.

Use restrictions

The same batch of messages should have the same topic, the same WaitStoreMSgok, and no support for timed messages. In addition, the total size of the body of each sent message should not exceed 1MB.

How to use bulk delivery

Bulk use is easy if you only send messages that are less than 1MB bytes at a time.

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}
    
Copy the code

A large number split into lists

The complexity of bulk sending only increases when a large number of messages are sent, and it may not be possible to determine whether the body of a bulk message exceeds the 1MB size limit. At this point you need to split the message into a List

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext(a) {
        return currIndex < messages.size();
    }
    @Override public List<Message> next(a) {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        returnsubList; }}//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error}}Copy the code

Example message filter

In most cases, the tag is a simple and useful design for selecting the desired message. Such as:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
Copy the code

In the above example, the consumer will receive a message containing TAGA or TAGB or TAGC, but the restriction is that each message can only have one tag, which may not be appropriate for complex scenarios. In this case, SQL expressions can be used to filter out messages.

The principle of

The SQL function can do some calculations from the properties entered when sending the message. Module in the syntax defined by RocketMQ. You can implement some interesting logic

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

Copy the code

grammar

Rocketmq only defines some basic syntax to support this feature, and you can easily extend it.

  1. Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  2. Character comparison, like =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, OR, NOT;

Constant type:

  1. Numeric, like 123, 3.1415;
  2. Character, like ‘abc’, must be made with single quotes;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;

Use restrictions

Only push messages can be selected using this via SQL92. The interfaces are as follows:

public void subscribe(final String topic, final MessageSelector messageSelector)
Copy the code

The production sample

Attributes can be put into a message when sent using the method putUserProperty.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown();
Copy the code

Consumer instance

Use messageselector.bysQL to consume messages.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code

Open the message

Open Messaging, which includes specified industry guidelines and messaging, streaming specifications, provides a common framework for finance, e-commerce, Internet of Things, and big data. The design principles are cloud-oriented, simple, flexible and language-independent heterogeneous environments. Meeting these specifications makes it possible to develop heterogeneous messaging applications across all major platforms and operating systems.

RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, and access to RocketMQ based on OpenMessaging is demonstrated in the following example.

OMSProducer

The following example shows how to use RocketMQ to send synchronous, asynchronous, and one-way messages.

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC"."OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC"."OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); }}); } { producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC"."OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n"); } producer.shutdown(); messagingAccessPoint.shutdown(); }}Copy the code

OMSPullConsumer

Use OMSPullConsumer to pull messages from a special queue

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if(message ! =null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); } consumer.shutdown(); messagingAccessPoint.shutdown(); }}Copy the code

OMSPushConsumer

Append OMS PushConsumer to the specified queue and press MessageListenner to consume the message.

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run(a) { consumer.shutdown(); messagingAccessPoint.shutdown(); }})); consumer.attachQueue("OMS_HELLO_TOPIC".new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); context.ack(); }}); }}Copy the code

Examples of transactional messages

What are transactional messages?

It can be thought of as a two-phase commit message implementation to ensure ultimate consistency in a distributed system. Transactional messages ensure that the execution of local transactions and the sending of messages are self-executing.

Use restrictions

  • Transaction messages do not support timed and batch operations.
  • To avoid a single message being checked more than once, resulting in half of the queue messages being backlogged, we set a default limit of 15 checks for a single message. Users can change this limit by changing the “TransactionCheckMax” parameter in the broker configuration if a message is checked more times than the “Transactio “parameter in the broker configuration NCheckMax “time, the acting by default will discard the message and print the error log, users can rewrite” AbstractTransationCheckListener class to change this behavior.
  • The transaction message will be checked in the transactionTimeout event, which can be configured in the broker configuration. Users can also change this limit by setting the user property “CHECK_IMMUNITY_TIME_IN_SECONDS”. This parameter takes precedence over the “transactionMsgTimeout” parameter when sending transactional messages.
  • Transaction messages may be checked or consumed multiple times.
  • The submitted message may fail to be relocated to the user’s target topic. For now, it depends on logging. High availability is guaranteed by RocketMQ’s own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, the synchronous double-write mechanism is recommended.
  • Producer IDS for transactional messages cannot be shared with producer ids for other types of messages. Unlike other types of messages, transactional messages allow backward lookup. Queries the client’s MQ server by manufacturer ID.

Transactional Message Transactional Status

  • TransactionStatus.Com mitTransaction: commit the transaction, which means that allow consumers to use the message.
  • TransactionStatus. RollbackTransaction: roll back the transaction, which means that the message will be deleted and not allowed to consumption.
  • TransactionStatus. Unknown: intermediate state, which means the MQ needs to check, to determine the state.

Creating transaction messages

Using the TransactionMQProducer class to create a Producer client and specify a unique producerGroup, you can set up a custom thread pool to handle check requests. After performing a local transaction, you need to reply to MQ based on the execution result, and the reply status is described in the section above.

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                returnthread; }}); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}for(int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }}Copy the code

Consumer transaction messages

The “executeLocalTransaction” method is used to execute local transactions when a half-message is successfully sent. It returns one of the three transaction states mentioned in the previous section. The checkLocalTransaction method is used to check the status of local transactions and respond to MQ check requests. It also returns one of the three transaction states mentioned in the previous section.

   import.public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
   
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
   
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
   
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null! = status) {switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       returnLocalTransactionState.ROLLBACK_MESSAGE; }}returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code