After more than a year, I stopped blogging for more than a year, during which I have been busy with business, arguing with product, testing and other departments every day, rushing between departments every day and dealing with some things on the team. Looking back, I suddenly found that the technology was actually lagging behind a lot, I hope I can continue to adhere to it in the future.
Welcome to the public number [sharedCode] committed to mainstream middleware source code analysis, personal website: www.shared-code.com/
I. Installation and configuration
download
Liverpoolfc.tv: rocketmq.apache.org/
Such as: rocketmq – all – 4.0.0 – incubating – bin – the zip
After decompressing the file, the directory structure is as follows
-rocketmq
- LICENSE
- NOTICE
- README.md
- benchmark
- bin
- conf
- 2m-2s-async // Multi-master asynchronous replication mode
- 2m-noslave // Multi-master mode without slave mode
- 2m-2s-sync // Multi-master synchronous replication mode
- lib
Copy the code
2m-noslave, 2m-2s-async and 2m-2s-sync folders exist in the conf folder.
2m-noslave Indicates that the two masters are unavailable
2m-2s-async Two master, two slave, synchronous data replication configuration
2m-2s-sync Indicates the configuration of two master, two slave, and asynchronous data replication
If you’re running a demo, you can just play with one of these.
Broker configuration file details
The default cluster name
brokerClusterName=DefaultCluster
The name of the # broke
brokerName=broker-a
The port on which the Broker listens for external services
listenPort=10911
# 0 is master, and anything greater than 0 is slave
brokerId=0
At what time should I delete commit logs that have exceeded the file retention time
deleteWhen=04
The default retention time is 48 hours
fileReservedTime=120
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
# - SLAVE from the machine
brokerRole=ASYNC_MASTER
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
flushDiskType=ASYNC_FLUSH
Broker IP address
brokerIP1=localhost
# nameServer address, semicolon split
namesrvAddr=localhost:9876
Whether to allow the Broker to create topics automatically
autoCreateTopicEnable=true
Whether to enable the Broker to automatically create subscription groups
autoCreateSubscriptionGroup=true
The path to store the commit log
storePathCommitLog = /app/rocketmq/stroe/commitlog
Store the path to consume queue
storePathConsumerQueue = /app/rocketmq/stroe/commitlog
# commit log mapping file size
mapedFileSizeCommitLog = 1024 * 1024 * 1024
Copy the code
Configuration to start
RocketMq’s default memory configuration takes up a lot of memory, so you need to modify the startup parameters
Modify broker start parameters
Modify nameServer startup parameters
Start the command
Start the nameSrv
nohup sh mqnamesrv > /app/rocketmq/logs/mqnamesrv.log 2>&1 &
Copy the code
View/app/rocketmq/logs/mqnamesrv log log
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/rmq_srv_gc.log due to No such file or directory
Java HotSpot(TM) 64-Bit Server VM warning: MaxNewSize (131072k) is equal to or greater than the entire heap (131072k). A new max generation size of 131008k will be used.
The Name Server boot success. serializeType=JSON
Copy the code
Start the broker
nohup sh ./mqbroker -c /app/rocketmq/conf/2m-2s-async/broker-a.properties > /app/rocketmq/logs/broker-a.log 2>&1 &
Copy the code
View/app/rocketmq/logs/broker – a. og log
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /dev/shm/mq_gc_pid22422.log due to No such file or directory
The broker[broker-a, localhost:10911] boot success. serializeType=JSON and name server is localhost:9876
Copy the code
The shutdown command
#Close the borker
sh ./mqshutdown broker
#Close the nameSrv
sh ./mqshutdown namesrv
Copy the code
A simple test
consumers
public class ConsumerTest {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// Set the producer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
// Set the NameServer address
consumer.setNamesrvAddr("localhost:9876");
// Subscribe to the topic
consumer.subscribe("MyTopic"."*");
// Register a message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});/ / start
consumer.start();
System.out.printf("Consumer Started.%n"); }}Copy the code
producers
public class ProducerTest {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// Set the producer group name
DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
// Set the NameServer address
producer.setNamesrvAddr("localhost:9876");
/ / start
producer.start();
for (int i = 0; i < 5; i++) {
// Create a message containing topic, tag, and message content
Message msg = new Message("MyTopic"."MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// Send the result
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
Thread.sleep(50);
}
// Close when not in useproducer.shutdown(); }}Copy the code
Start producers and consumers separately
Producer log
16:29:33569. [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D19D0000, offsetMsgId=7F00000100002A9F0000000001140F64, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=3], queueOffset=25002]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D1E90001, offsetMsgId=7F00000100002A9F0000000001141015, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=0], queueOffset=25002]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D2350002, offsetMsgId=7F00000100002A9F00000000011410C6, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=1], queueOffset=25003]
SendResult [sendStatus=SEND_OK, msgId=0AD0C99C67F118B4AAC26562D2720003, offsetMsgId=7F00000100002A9F0000000001141177, messageQueue=MessageQueue [topic=MyTopic, brokerName=broker-a, queueId=2], queueOffset=25003]
16:29:35635. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:10911] result: true
16:29:35639. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:10911] result: true
16:29:35640. [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127. 0. 01.:9876] result: true
Copy the code
Consumer log
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=177, queueOffset=25002, sysFlag=0, bornTimestamp=1568968175007, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175023, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001140F64, commitLogOffset=18091876, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25003, CONSUME_START_TIME=1568968175082, UNIQ_KEY=0AD0C99C67F118B4AAC26562D19D0000, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.48], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=177, queueOffset=25002, sysFlag=0, bornTimestamp=1568968175081, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175105, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001141015, commitLogOffset=18092053, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25003, CONSUME_START_TIME=1568968175118, UNIQ_KEY=0AD0C99C67F118B4AAC26562D1E90001, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.49], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=1, storeSize=177, queueOffset=25003, sysFlag=0, bornTimestamp=1568968175157, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175158, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F00000000011410C6, commitLogOffset=18092230, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25004, CONSUME_START_TIME=1568968175167, UNIQ_KEY=0AD0C99C67F118B4AAC26562D2350002, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.50], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=177, queueOffset=25003, sysFlag=0, bornTimestamp=1568968175218, bornHost=/127. 0. 01.:55037, storeTimestamp=1568968175223, storeHost=/127. 0. 01.:10911, msgId=7F00000100002A9F0000000001141177, commitLogOffset=18092407, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='MyTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25004, CONSUME_START_TIME=1568968175228, UNIQ_KEY=0AD0C99C67F118B4AAC26562D2720003, WAIT=true, TAGS=MyTag}, body=[72.101.108.108.111.32.82.111.99.107.101.116.77.81.32.51], transactionId='null'}]]
Copy the code
Looking at the above log, a very simple rocketMq producer consumer model is in place
RocketMq Best practices
Some of the best practices provided by the official, well written, generally no special requirements, using this configuration is basically no problem
Github.com/apache/rock…
conclusion
Today is a very brief introduction to rocketMq standalone, a very introductory tutorial, followed by a bit of in-depth rocketMq source analysis