After more than a year, I stopped blogging for more than a year, during which I have been busy with business, arguing with product, testing and other departments every day, rushing between departments every day and dealing with some things on the team. Looking back, I suddenly found that the technology was actually lagging behind a lot, I hope I can continue to adhere to it in the future.

I. Installation and configuration


Such as: rocketmq – all – 4.0.0 – incubating – bin – the zip

After decompressing the file, the directory structure is as follows

	- benchmark
	- bin
	- conf
		- 2m-2s-async // Multi-master asynchronous replication mode
		- 2m-noslave  // Multi-master mode without slave mode
		- 2m-2s-sync  // Multi-master synchronous replication mode
	- lib
2m-noslave, 2m-2s-async and 2m-2s-sync folders exist in the conf folder.

2m-noslave Indicates that the two masters are unavailable

2m-2s-async Two master, two slave, synchronous data replication configuration

2m-2s-sync Indicates the configuration of two master, two slave, and asynchronous data replication

If you’re running a demo, you can just play with one of these.

Broker configuration file details

The default cluster name
The name of the # broke
The port on which the Broker listens for external services
# 0 is master, and anything greater than 0 is slave
At what time should I delete commit logs that have exceeded the file retention time
The default retention time is 48 hours
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
# - SLAVE from the machine
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
Broker IP address
# nameServer address, semicolon split
Whether to allow the Broker to create topics automatically
Whether to enable the Broker to automatically create subscription groups
The path to store the commit log
storePathCommitLog = /app/rocketmq/stroe/commitlog
Store the path to consume queue
storePathConsumerQueue = /app/rocketmq/stroe/commitlog
# commit log mapping file size
mapedFileSizeCommitLog = 1024 * 1024 * 1024
Configuration to start

RocketMq’s default memory configuration takes up a lot of memory, so you need to modify the startup parameters

Modify broker start parameters

Modify nameServer startup parameters

Start the command
Start the nameSrv
nohup sh mqnamesrv > /app/rocketmq/logs/mqnamesrv.log 2>&1 &
View/app/rocketmq/logs/mqnamesrv log log

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory

Java HotSpot(TM) 64-Bit Server VM warning: MaxNewSize (131072k) is equal to or greater than the entire heap (131072k).  A new max generation size of 131008k will be used.
The Name Server boot success. serializeType=JSON
Start the broker
nohup sh ./mqbroker -c /app/rocketmq/conf/2m-2s-async/ > /app/rocketmq/logs/broker-a.log 2>&1 &
View/app/rocketmq/logs/broker – a. og log

Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid22422.log due to No such file or directory

The broker[broker-a, localhost:10911] boot success. serializeType=JSON and name server is localhost:9876
The shutdown command
#Close the borker
sh ./mqshutdown broker
#Close the nameSrv
sh ./mqshutdown namesrv
A simple test

public class ConsumerTest {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // Set the producer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
        // Set the NameServer address
        // Subscribe to the topic
        // Register a message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});/ / start
public class ProducerTest {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // Set the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        // Set the NameServer address
        / / start
        for (int i = 0; i < 5; i++) {
            // Create a message containing topic, tag, and message content
            Message msg = new Message("MyTopic"."MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // Send the result
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
Start producers and consumers separately

Producer log

16:29:33569. [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D19D0000, offsetMsgId=7F00000100002A9F0000000001140F64, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=3], queueOffset=25002]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D1E90001, offsetMsgId=7F00000100002A9F0000000001141015, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=0], queueOffset=25002]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D2350002, offsetMsgId=7F00000100002A9F00000000011410C6, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=1], queueOffset=25003]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D2720003, offsetMsgId=7F00000100002A9F0000000001141177, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=2], queueOffset=25003]
16:29:35635. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:10911] result: true
16:29:35639. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:10911] result: true
16:29:35640. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:9876] result: true
Consumer log

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=177, queueOffset=25002, sysFlag=0, bornTimestamp=1568968175007, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175023, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001140F64, commitLogOffset=18091876, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25003, CONSUME_START_TIME=1568968175082, UNIQ_KEY=0AD0C99C67F118B4AAC26562D19D0000, WAIT=true, TAGS=MyTag}, body=[], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=177, queueOffset=25002, sysFlag=0, bornTimestamp=1568968175081, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175105, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001141015, commitLogOffset=18092053, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25003, CONSUME_START_TIME=1568968175118, UNIQ_KEY=0AD0C99C67F118B4AAC26562D1E90001, WAIT=true, TAGS=MyTag}, body=[], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=1, storeSize=177, queueOffset=25003, sysFlag=0, bornTimestamp=1568968175157, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175158, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F00000000011410C6, commitLogOffset=18092230, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25004, CONSUME_START_TIME=1568968175167, UNIQ_KEY=0AD0C99C67F118B4AAC26562D2350002, WAIT=true, TAGS=MyTag}, body=[], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=177, queueOffset=25003, sysFlag=0, bornTimestamp=1568968175218, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175223, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001141177, commitLogOffset=18092407, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25004, CONSUME_START_TIME=1568968175228, UNIQ_KEY=0AD0C99C67F118B4AAC26562D2720003, WAIT=true, TAGS=MyTag}, body=[], transactionId='null'}]] 
Looking at the above log, a very simple rocketMq producer consumer model is in place

RocketMq Best practices

Some of the best practices provided by the official, well written, generally no special requirements, using this configuration is basically no problem…


Today is a very brief introduction to rocketMq standalone, a very introductory tutorial, followed by a bit of in-depth rocketMq source analysis