Quick learning
This quick start manual describes how to set up the RocketMQ messaging system on a local computer and how to do basic production and consumption details.
Necessary environment
- 64-bit operating system
- A 64 – bit JDK1.8 +
- Maven 3.2.x +
- Git
- More than 4G of storage is used for proxy instances
Download and build the release
The source code for the official 4.4.0 version can be downloaded here. You can also download a binary version here
Now execute the following commands to unpack the 4.4.0 source and build the binary package components.
> unzip rocketmq - all - 4.4.0 - source - the zip >cdRocketmq-all-4.4.0 / > mvn-prelease -all-dskiptests clean install -u >cd distribution/target/apache-rocketmq
Copy the code
The mvn-prelease -all-dskiptests clean install -u command will build the downloaded source package and then produce the target directory in the distribution directory, where the built package is located.
Start the Name Server
charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distribution/targe T/apache-RocketMQ $nohup sh Bin/mqnamesrv-& [1] 5273 charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distribution/targe T/apache-RocketMQ $nohup: ignores input and appends output to'nohup.out'
tail -f~/logs/rocketmqlogs/namesrv.log 2019-03-31 19:24:10 INFO main - tls.client.keyPath = null 2019-03-31 19:24:10 INFO main - tls.client.keyPassword = null 2019-03-31 19:24:10 INFO main - tls.client.certPath = null 2019-03-31 19:24:10 INFO main - tls.client.authServer =false
2019-03-31 19:24:10 INFO main - tls.client.trustCertPath = null
2019-03-31 19:24:11 INFO main - Using OpenSSL provider
2019-03-31 19:24:11 INFO main - SSLContext created forserver 2019-03-31 19:24:12 INFO NettyEventExecutor - NettyEventExecutor service started 2019-03-31 19:24:12 INFO FileWatchService - FileWatchService service started 2019-03-31 19:24:12 INFO main - The Name Server boot success. serializeType=JSON 2019-03-31 19:25:11 INFO NSScheduledThread1 - -------------------------------------------------------- 2019-03-31 19:25:11 INFO NSScheduledThread1 - configTable SIZE: 0Copy the code
The Name Server boot success can be seen by viewing The Name Server boot success, and The serialization method is JSON.
Start the Broker Server
charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distr ibution/target/ apache-RocketMQ $nohup sh Bin /mqbroker -n localhost:9876 & [1] 5784 nohUP: ignores input and appends output to'nohup.out'charse@charse-thinkpad:/media/charse/ documentation /Code/study/MQ/ Rocketmq-all-4.4.0 /distr ibution/target/ apache-RocketMQ $tail-f ~/logs/rocketmqlogs/broker.log
2019-03-31 19:41:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:41:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 19:41:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:42:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 19:42:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
Copy the code
You can see that the broker has successfully registered with the Name Server
In addition, you can run the JPS command to check whether the service is started successfully
charse@charse-thinkpad:~$ jps
12128 Main
12549 Jps
5279 NamesrvStartup
5791 BrokerStartup
Copy the code
As you can see, both the NameServer and the Broker have been started successfully, so we can proceed to the next step of simulating the sender and consumer.
Broker
When programmatically producing messages to a topic(TopicTest), the Broker discovers that the topic is not available. The Broker creates the topic(TopicTest) by default and configits default configuration. What is displayed in producer is my local LAN address (192.168.3.16).
2019-03-31 20:37:43 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: SendMessageThread_1 - Create new Topic by default Topic :[TBW102] SendMessageThread_1 - Create new Topic by default Topic :[TBW102] config:[TopicConfig [topicName=TopicTest,readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] producer:[192.168.3.16:47538] 2019-03-31 20:37:43 INFO BrokerOutAPI_THREAD_4-Register Broker to Name Server localhost:9876 OK 2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842] 2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: producer1 channel: ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842] 2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer[producer1] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863927] 2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer group[producer1] from groupChannelTable 2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725CB35, L:/192.168.3.16:10911 -r :/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863933] 2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable 2019-03-31 20:37:52 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK 2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - dispatch behind commitlog 0 bytes
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
Copy the code
When you create a producer, you can see that a CLIENT_INNER_PRODUCER is created in the groupChannelTable and then a producer is created for the client Producer1 is registered in the groupChannelTable, where you can see some information about the producer. But when Produer turns off shutdown. Close producer1 on the client and remove groupChannelTable from CLIENT_INNER_PRODUCER. Agents register with Nameserver from time to time.
When the client creates a consumer, as shown in the log output by the agent in the following figure, it can be seen that when there is a consumer, a subscription group will be created, and the configuration information of the subscription group will be created. Then after the connection of the new consumer, topI will be added to the corresponding group, including the topic(topic) to which you subscribe. A RETRY Topicz topic named after the %RETRY% consumer group is added, and an up subscription is added. It also creates a new producer that is CLIENT_INNER_PRODUCER
2019-03-31 20:38:22 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - auto create a subscription group, SubscriptionGroupConfig [groupName=consumer1, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - create new topic TopicConfig [topicName=%RETRY%consumer1, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2019-03-31 20:38:35 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915211]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, [(expressionType=TAG)]] [(type =TAG)]] CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x1C14007d, L:/192.168.3.16:10911 -r :/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915412] 2019-03-31 20:38:35 INFO HeartbeatThread_3 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915833, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915843, expressionType=TAG]
2019-03-31 20:38:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 20:39:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:39:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:40:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
Copy the code
NameServer
As you can see from the log output in Namesever when no topic is created, Roacket MQ has many topics created by default
2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: ChannelRegistered 127.0.0.1:48210 2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: ChannelActive, the channel[127.0.0.1:48210] 2019-03-31 19:39:41 INFO Remotingthread_1 - New Topic Registered, charse-thinkpad QueueData [brokerName=charse-thinkpad,readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, BenchmarkTest QueueData [brokerName=charse-thinkpad, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, TBW102 QueueData [brokerName=charse-thinkpad, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, DefaultCluster QueueData [brokerName=charse-thinkpad, readQueueNums=16, writeQueueNums=16, perm=7, TopicSynFlag =0 INFO RemotingExecutorThread_1 - New Broker Registered, 192.168.3.16:10911 HAServer: RemotingExecutorThread_4 - New Topic Registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=charse-thinkpad,readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
Copy the code
When creating a topic(TopicTest) on the client, you can see that the topic has been registered in nameserver.
2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: ChannelRegistered 127.0.0.1:49208 2019-03-31 20:37:42 INFO NettyServerCodecThread_2-Netty SERVER PIPELINE: ChannelActive, the channel[127.0.0.1:49208] 2019-03-31 20:37:43 INFO RemotingExecutorThread_4 - New Topic Registered, TopicTest QueueData [brokerName=charse-thinkpad,readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0] 2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelInactive, The channel[127.0.0.1:49208] 2019-03-31 20:37:43 INFO nettyServerCodecthread_2-netty SERVER channelUnregistered, [127.0.0.1:49208] 2019-03-31 20:38:34 INFO nettyServerCodecthread_3-netty SERVER PIPELINE [127.0.0.1:49208] 2019-03-31 20:38:34 INFO nettyServerCodecthread_3-netty SERVER PIPELINE ChannelRegistered 127.0.0.1:49224 2019-03-31 20:38:34 INFO NettyServerCodecThread_3-Netty SERVER PIPELINE: ChannelActive, the channel[127.0.0.1:49224] 2019-03-31 20:38:35 INFO RemotingExecutorThread_4 - New Topic Registered, %RETRY%consumer1 QueueData [brokerName=charse-thinkpad,readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
Copy the code
When the consumer is closed, the client consumer Consumer1 is unregistered from consumerGroupInfo. Then unregister CLIENT_INNER_PRODUCER from the groupChannelTable.
2019-03-31 21:50:08 INFO HeartbeatThread_3 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, [(expressionType=TAG)]] [(type =TAG)]] CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x7E95D6ee, L:/192.168.3.16:10911 -r :/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700] 2019-03-31 21:50:08 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040208756, expressionType=TAG] 2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister a consumer[consumer1] from consumerGroupInfo ClientChannelInfo [channel=[id: 0x7E95D6ee, L:/192.168.3.16:10911 -r :/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208763] 2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister consumer ok, no any connection, and remove consumer group, consumer1 2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x7E95D6ee, L:/192.168.3.16:10911 -r :/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208801] 2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable 2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - dispatch behind commitlog 0 bytes
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 21:50:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 21:50:28 WARN PullMessageThread_2 - the consumer's group info not exist, group: ERROR NettySerVernioSelecTOR_33-ProcessRequestWrapper Response to /192.168.3.16:49810 failed java.nio.channels.ClosedChannelException: null at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...) (Unknown Source) ~ [netty - all - 4.0.42. Final. Jar: 4.0.42. The Final] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=11, language=JAVA, version=293, opaque=24, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=2, suspendTimeoutMillis=15000, commitOffset=0, topic=%RETRY%consumer1, queueOffset=0, expressionType=TAG, subVersion=1554040208756, consumerGroup=consumer1}, serializeTypeCurrentRPC=JSON] 2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=24, language=JAVA, version=293, opaque=24, flag(B)=1, remark=the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details., extFields=null, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 21:51:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
Copy the code
Production and consumption example
- There are three ways to use RocketMQ for sending purchases: reliable synchronization; Reliable asynchronous mode; One-way delivery.
- Consume messages using RocketMQ
1. Add dependencies
Maven:
< the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.3.0 < / version > </dependency>Copy the code
2.1 Synchronous sending consumption
Reliable synchronous transmission is widely used in important notification messages, SMS notification, SMS marketing system and other scenarios.
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.producer.shutdown(); }}Copy the code
2.2 Sending Messages Asynchronously
Asynchronous transports are typically used in response time sensitive business scenarios.
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); }}); }//Shut down once the producer instance is not longer in use.producer.shutdown(); }}Copy the code
2.3 Unidirectional Transmission
One-way transport is used in situations that require moderate reliability, such as log collection.
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.producer.shutdown(); }}Copy the code
3. Sample consumption message
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest"."*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context)
{
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n"); }}Copy the code
More examples of rocketMQ use can be found here.
The order message
RocketMQ provides a first-in, first-out sequential message queue. The sending/receiving of global and partially ordered messages is shown in the following example.
Sending message Example
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("example_group_name");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdownproducer.shutdown(); }}Copy the code
Subscribe message example
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest"."TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List
msgs, ConsumeOrderlyContext context)
{
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) = =0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) = =0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) = =0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) = =0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.printf("Consumer Started.%n"); }}Copy the code
News broadcast
Broadcasting is sending messages to all subscribers of the master, and is a good choice if you want all subscribers to receive messages on a topic.
Production example
ublic class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); } producer.shutdown(); }}Copy the code
Examples of consumption
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest"."TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context)
{
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }}Copy the code
Timing of the message
Timed messages are different from regular messages because they are delivered after a set delay.
1. Start the consumer to wait for incoming subscription messages
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic"."*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "]"
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Launch consumerconsumer.start(); }}Copy the code
2. Send a scheduled message
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message "+ i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown(); }}Copy the code
MessageDelayLevel =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Where level=0 indicates no delay and level=1 indicates 1 delay. Level =2 indicates level 2 delay, and so on. As you can observe, the message will be consumed 10 seconds after it was stored.
Batch send
Why bulk delivery?
Sending messages in batches can improve the transmission performance of small messages.
Use restrictions
The same batch of messages should have the same topic, the same WaitStoreMSgok, and no support for timed messages. In addition, the total size of the body of each sent message should not exceed 1MB.
How to use bulk delivery
Bulk use is easy if you only send messages that are less than 1MB bytes at a time.
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
Copy the code
A large number split into lists
The complexity of bulk sending only increases when a large number of messages are sent, and it may not be possible to determine whether the body of a bulk message exceeds the 1MB size limit. At this point you need to split the message into a List
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext(a) {
return currIndex < messages.size();
}
@Override public List<Message> next(a) {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
returnsubList; }}//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//handle the error}}Copy the code
Example message filter
In most cases, the tag is a simple and useful design for selecting the desired message. Such as:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
Copy the code
In the above example, the consumer will receive a message containing TAGA or TAGB or TAGC, but the restriction is that each message can only have one tag, which may not be appropriate for complex scenarios. In this case, SQL expressions can be used to filter out messages.
The principle of
The SQL function can do some calculations from the properties entered when sending the message. Module in the syntax defined by RocketMQ. You can implement some interesting logic
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
Copy the code
grammar
Rocketmq only defines some basic syntax to support this feature, and you can easily extend it.
- Numeric comparison, like >, >=, <, <=, BETWEEN, =;
- Character comparison, like =, <>, IN;
- IS NULL or IS NOT NULL;
- Logical AND, OR, NOT;
Constant type:
- Numeric, like 123, 3.1415;
- Character, like ‘abc’, must be made with single quotes;
- NULL, special constant;
- Boolean, TRUE or FALSE;
Use restrictions
Only push messages can be selected using this via SQL92. The interfaces are as follows:
public void subscribe(final String topic, final MessageSelector messageSelector)
Copy the code
The production sample
Attributes can be put into a message when sent using the method putUserProperty.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
Copy the code
Consumer instance
Use messageselector.bysQL to consume messages.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code
Open the message
Open Messaging, which includes specified industry guidelines and messaging, streaming specifications, provides a common framework for finance, e-commerce, Internet of Things, and big data. The design principles are cloud-oriented, simple, flexible and language-independent heterogeneous environments. Meeting these specifications makes it possible to develop heterogeneous messaging applications across all major platforms and operating systems.
RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, and access to RocketMQ based on OpenMessaging is demonstrated in the following example.
OMSProducer
The following example shows how to use RocketMQ to send synchronous, asynchronous, and one-way messages.
public class OMSProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n");
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC"."OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC"."OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); }}); } { producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC"."OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n"); } producer.shutdown(); messagingAccessPoint.shutdown(); }}Copy the code
OMSPullConsumer
Use OMSPullConsumer to pull messages from a special queue
public class OMSPullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
consumer.startup();
System.out.printf("Consumer startup OK%n");
Message message = consumer.poll();
if(message ! =null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); } consumer.shutdown(); messagingAccessPoint.shutdown(); }}Copy the code
OMSPushConsumer
Append OMS PushConsumer to the specified queue and press MessageListenner to consume the message.
public class OMSPushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run(a) { consumer.shutdown(); messagingAccessPoint.shutdown(); }})); consumer.attachQueue("OMS_HELLO_TOPIC".new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); context.ack(); }}); }}Copy the code
Examples of transactional messages
What are transactional messages?
It can be thought of as a two-phase commit message implementation to ensure ultimate consistency in a distributed system. Transactional messages ensure that the execution of local transactions and the sending of messages are self-executing.
Use restrictions
- Transaction messages do not support timed and batch operations.
- To avoid a single message being checked more than once, resulting in half of the queue messages being backlogged, we set a default limit of 15 checks for a single message. Users can change this limit by changing the “TransactionCheckMax” parameter in the broker configuration if a message is checked more times than the “Transactio “parameter in the broker configuration NCheckMax “time, the acting by default will discard the message and print the error log, users can rewrite” AbstractTransationCheckListener class to change this behavior.
- The transaction message will be checked in the transactionTimeout event, which can be configured in the broker configuration. Users can also change this limit by setting the user property “CHECK_IMMUNITY_TIME_IN_SECONDS”. This parameter takes precedence over the “transactionMsgTimeout” parameter when sending transactional messages.
- Transaction messages may be checked or consumed multiple times.
- The submitted message may fail to be relocated to the user’s target topic. For now, it depends on logging. High availability is guaranteed by RocketMQ’s own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, the synchronous double-write mechanism is recommended.
- Producer IDS for transactional messages cannot be shared with producer ids for other types of messages. Unlike other types of messages, transactional messages allow backward lookup. Queries the client’s MQ server by manufacturer ID.
Transactional Message Transactional Status
- TransactionStatus.Com mitTransaction: commit the transaction, which means that allow consumers to use the message.
- TransactionStatus. RollbackTransaction: roll back the transaction, which means that the message will be deleted and not allowed to consumption.
- TransactionStatus. Unknown: intermediate state, which means the MQ needs to check, to determine the state.
Creating transaction messages
Using the TransactionMQProducer class to create a Producer client and specify a unique producerGroup, you can set up a custom thread pool to handle check requests. After performing a local transaction, you need to reply to MQ based on the execution result, and the reply status is described in the section above.
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
returnthread; }}); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA"."TagB"."TagC"."TagD"."TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}for(int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); }}Copy the code
Consumer transaction messages
The “executeLocalTransaction” method is used to execute local transactions when a half-message is successfully sent. It returns one of the three transaction states mentioned in the previous section. The checkLocalTransaction method is used to check the status of local transactions and respond to MQ check requests. It also returns one of the three transaction states mentioned in the previous section.
import.public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null! = status) {switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
returnLocalTransactionState.ROLLBACK_MESSAGE; }}returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code