After more than a year, I stopped blogging for more than a year, during which I have been busy with business, arguing with product, testing and other departments every day, rushing between departments every day and dealing with some things on the team. Looking back, I suddenly found that the technology was actually lagging behind a lot, I hope I can continue to adhere to it in the future.

Welcome to the public number [sharedCode] committed to mainstream middleware source code analysis, personal website: www.shared-code.com/

I. Installation and configuration

download

Liverpoolfc.tv: rocketmq.apache.org/

Such as: rocketmq – all – 4.0.0 – incubating – bin – the zip

After decompressing the file, the directory structure is as follows

-rocketmq
	- LICENSE
	- NOTICE
	- README.md
	- benchmark
	- bin
	- conf
		- 2m-2s-async // Multi-master asynchronous replication mode
		- 2m-noslave  // Multi-master mode without slave mode
		- 2m-2s-sync  // Multi-master synchronous replication mode
	- lib
Copy the code

2m-noslave, 2m-2s-async and 2m-2s-sync folders exist in the conf folder.

2m-noslave Indicates that the two masters are unavailable

2m-2s-async Two master, two slave, synchronous data replication configuration

2m-2s-sync Indicates the configuration of two master, two slave, and asynchronous data replication

If you’re running a demo, you can just play with one of these.

Broker configuration file details

The default cluster name
brokerClusterName=DefaultCluster
The name of the # broke
brokerName=broker-a
The port on which the Broker listens for external services
listenPort=10911
# 0 is master, and anything greater than 0 is slave
brokerId=0   
At what time should I delete commit logs that have exceeded the file retention time
deleteWhen=04
The default retention time is 48 hours
fileReservedTime=120
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
# - SLAVE from the machine
brokerRole=ASYNC_MASTER
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
flushDiskType=ASYNC_FLUSH
Broker IP address
brokerIP1=localhost
# nameServer address, semicolon split
namesrvAddr=localhost:9876
Whether to allow the Broker to create topics automatically
autoCreateTopicEnable=true
Whether to enable the Broker to automatically create subscription groups
autoCreateSubscriptionGroup=true
The path to store the commit log
storePathCommitLog = /app/rocketmq/stroe/commitlog
Store the path to consume queue
storePathConsumerQueue = /app/rocketmq/stroe/commitlog
# commit log mapping file size
mapedFileSizeCommitLog = 1024 * 1024 * 1024
Copy the code

Configuration to start

RocketMq’s default memory configuration takes up a lot of memory, so you need to modify the startup parameters

Modify broker start parameters

Modify nameServer startup parameters

Start the command
Start the nameSrv
nohup sh mqnamesrv > /app/rocketmq/logs/mqnamesrv.log 2>&1 &
Copy the code

View/app/rocketmq/logs/mqnamesrv log log

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory

Java HotSpot(TM) 64-Bit Server VM warning: MaxNewSize (131072k) is equal to or greater than the entire heap (131072k).  A new max generation size of 131008k will be used.
The Name Server boot success. serializeType=JSON
Copy the code
Start the broker
nohup sh ./mqbroker -c /app/rocketmq/conf/2m-2s-async/broker-a.properties > /app/rocketmq/logs/broker-a.log 2>&1 &
Copy the code

View/app/rocketmq/logs/broker – a. og log

Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid22422.log due to No such file or directory

The broker[broker-a, localhost:10911] boot success. serializeType=JSON and name server is localhost:9876
Copy the code
The shutdown command
#Close the borker
sh ./mqshutdown broker
#Close the nameSrv
sh ./mqshutdown namesrv
Copy the code

A simple test

consumers
public class ConsumerTest {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // Set the producer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
        // Set the NameServer address
        consumer.setNamesrvAddr("localhost:9876");
        // Subscribe to the topic
        consumer.subscribe("MyTopic"."*");
        // Register a message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});/ / start
        consumer.start();
        System.out.printf("Consumer Started.%n"); }}Copy the code
producers
public class ProducerTest {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // Set the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        // Set the NameServer address
        producer.setNamesrvAddr("localhost:9876");
        / / start
        producer.start();
        for (int i = 0; i < 5; i++) {
            // Create a message containing topic, tag, and message content
            Message msg = new Message("MyTopic"."MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // Send the result
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            Thread.sleep(50);
        }
        // Close when not in useproducer.shutdown(); }}Copy the code

Start producers and consumers separately

Producer log

16:29:33569. [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D19D0000, offsetMsgId=7F00000100002A9F0000000001140F64, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=3], queueOffset=25002]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D1E90001, offsetMsgId=7F00000100002A9F0000000001141015, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=0], queueOffset=25002]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D2350002, offsetMsgId=7F00000100002A9F00000000011410C6, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=1], queueOffset=25003]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D2720003, offsetMsgId=7F00000100002A9F0000000001141177, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=2], queueOffset=25003]
16:29:35635. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:10911] result: true
16:29:35639. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:10911] result: true
16:29:35640. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:9876] result: true
Copy the code

Consumer log

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=177, queueOffset=25002, sysFlag=0, bornTimestamp=1568968175007, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175023, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001140F64, commitLogOffset=18091876, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25003, CONSUME_START_TIME=1568968175082, UNIQ_KEY=0AD0C99C67F118B4AAC26562D19D0000, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.48], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=177, queueOffset=25002, sysFlag=0, bornTimestamp=1568968175081, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175105, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001141015, commitLogOffset=18092053, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25003, CONSUME_START_TIME=1568968175118, UNIQ_KEY=0AD0C99C67F118B4AAC26562D1E90001, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.49], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=1, storeSize=177, queueOffset=25003, sysFlag=0, bornTimestamp=1568968175157, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175158, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F00000000011410C6, commitLogOffset=18092230, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25004, CONSUME_START_TIME=1568968175167, UNIQ_KEY=0AD0C99C67F118B4AAC26562D2350002, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.50], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=177, queueOffset=25003, sysFlag=0, bornTimestamp=1568968175218, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175223, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001141177, commitLogOffset=18092407, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25004, CONSUME_START_TIME=1568968175228, UNIQ_KEY=0AD0C99C67F118B4AAC26562D2720003, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.51], transactionId='null'}]] 
Copy the code

Looking at the above log, a very simple rocketMq producer consumer model is in place

RocketMq Best practices

Some of the best practices provided by the official, well written, generally no special requirements, using this configuration is basically no problem

Github.com/apache/rock…

conclusion

Today is a very brief introduction to rocketMq standalone, a very introductory tutorial, followed by a bit of in-depth rocketMq source analysis