1. RocketMQ is introduced
RocketMQ is a high-performance, high-throughput distributed messaging middleware that alibaba opened source in 2012. RocketMQ has been donated to the Apache Foundation and became the Apache Incubator project in November 2016
2. Functions and features
RocketMQ is a low-latency, highly reliable, scalable, easy-to-use messaging middleware. Has the following features:
① It is a message middleware of queue model, which has the characteristics of high performance, high reliability, high real-time and distributed.
② Producers, consumers, and queues can all be distributed.
(3) The Producer sends messages in turn to a set of queues called topics. If a Consumer consumes a broadcast consumption, a single Consumer instance consumes all queues corresponding to the Topic. Multiple Consumer instances average the collection of queues corresponding to this topic.
④ Can ensure strict message order, support message backtracking
⑤ Provide rich message pull mode.
⑥ Efficient subscriber level expansion ability.
⑦ Real-time message subscription mechanism.
800 million level message accumulation capacity.
⑨ Less dependencies.
3. Compare different MQ middleware
latitude | rabbitmq | kafka | rocketmq |
---|---|---|---|
positioning | Traditional message middleware ensures message reliability | Log message | Reliable transport of non-logs |
availability | Cluster Common mode, mirroring queue mode | Data loss may occur if disks are flushed asynchronously | Realize asynchronous or synchronous disk flushing |
Single machine throughput | 1w | 10w | 10w |
Backlogged message capability | Based on memory and disk thresholds | Very good, disk bound | Very good, disk bound |
Order consumption | support | support | support |
Timing of the message | support | Does not support | support |
Transaction message | Does not support | Does not support | support |
Message retry | support | Does not support | support |
Dead-letter queue | support | Does not support | support |
4.RocketMQ system composition
RocketMQ provides a variety of message sending modes: synchronous sending, asynchronous sending, sequential sending, and unidirectional sending. Both synchronous and asynchronous messages require a response from the broker. One-way messages do not. Akcs: 0,1, -1
2. two forms of consumption Pull mode, push mode
Broker: Act as a Broker, storing and forwarding messages
Name Server: Routing. Similar to the zooKeeper registry, the broker registers the registered address to the Name Server
Topic: Represents a collection of messages. Each Topic can store many messages, and each message belongs to a Topic. Topic is the basic unit of Message subscription for RocketMQ
Tag: Message Tag used to distinguish different types of messages under the same topic
MessageQueue: A number of message queues can be set up for each Topic to pull data
5. Single machine quick start and installation
1. Download RocketMQ install package: https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip 2. Unzip rocketmq-all-4.7.1-bin-release.zip 3. CD /usr/local/rocketmq-all-4.7.1-release The rocketMQ directory is displayedCopy the code
Start and close NameServer
Start NameServer, then start the Broker, and register yourself with NameServer
/usr/local/rocketmq-all-4.7.1-release/bin Adjust the runserver then executes. Sh JAVA_OPT memory Settings in # start NameServer nohup/usr/local/usr/local/rocketmq - all - 4.7.1 - release/bin/mqnamesrv & # Check the NameServer log tail -f ~ / logs/rocketmqlogs/namesrv log # close / usr/local/usr/local/rocketmq - all - 4.7.1 - release/bin/mqshutdown namesrv # verification by executing the JPS command, if the end result: 21980 NamesrvStartUp The NameServer installation is successfulCopy the code
Start and close brokers
Based on the size of native memory before startup, Adjust the runbroker. Sh JAVA_OPT memory Settings in # start the broker nohup/usr/local/usr/local/rocketmq - all - 4.7.1 - release/bin/mqbroker & # Check the Broker startup log tail - f ~ / logs/rocketmqlogs/Broker. The log # close/usr/local/usr/local/rocketmq - all - 4.7.1 - release/bin/mqshutdown Broker # Verify that broker installation is successful by executing the command JPS if terminal appears: 21911 BrokerStartUpCopy the code
6. Install in cluster mode
Prepare two servers, 192.168.1.1 and 192.168.1.2
1.192.168.1.1 Modifying the configuration file broker-a.propterties on the server
BrokerName = Broker - A #0 brokerName= Rocketmq-cluster # brokerClusterName= RocketMq-cluster # Broker Name A value greater than 0 means Slave brokerId=0 #nameServer address namesrvAddr= RocketMQ-node1:9876; DefaultTopicQueueNums =4 # Allow the Broker to automatically create topics. AutoCreateTopicEnable =true Production environment should be closed autoCreateSubscriptionGroup = true # Broker foreign service listening on port listenPort = 10911 # delete files point in time, the default 4 am deleteWhen = 4 # document retention time, MapedFileSizeCommitLog =1073741824 #ConsumeQueue by default, 30W files are stored in each file. Adjust according to the business situation mapedFileSizeConsumeQueue = 120000 # 300000 # destroyMapedFileIntervalForcibly = redeleteHangedFileInterval = 120000 DiskMaxUsedSpaceRatio =88 storePathRootDir=/data/ Rocketmq /store #commitLog storage path StorePathCommitLog = / data/rocketmq/store/commitlog # consumption queue storage paths storage paths storePathConsumeQueue = / data/rocketmq/store/consumequeue # message index storage paths storePathIndex = / data/rocketmq/store/index # checkpoint file storage path storeCheckpoint = / data/rocketmq/store/checkpoint # abort abortFile file storage path = / data/rocketmq/store / # abort message limit maxMessageSize = 65536 # flushCommitLogLeastPages = 4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker Role #- ASYNC_MASTER Asynchronous copy Master #- SYNC_MASTER Synchronous double write Master #- SLAVE brokerRole=SYNC_MASTER # Flush Asynchronous flush #- SYNC_FLUSH synchronous brush plate flushDiskType = # SYNC_FLUSH checkTransactionMessageEnable = false number # # message thread pool sendMessageThreadPoolNums = 128 # message thread pool number # pullMessageThreadPoolNums = 128Copy the code
2.192.168.1.1 Modifying the configuration file broker-b.propterties on the server
brokerClusterName=rocketmq-cluster brokerName=broker-b brokerId=1 namesrvAddr=rocketmq-node1:9876; rocketmq-node2:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=11011 deleteWhen=04 fileReservedTime=120 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=300000 diskMaxUsedSpaceRatio=88 storePathRootDir=/data/rocketmq/store-s storePathCommitLog=/data/rocketmq/store-s/commitlog storePathConsumeQueue=/data/rocketmq/store-s/consumequeue storePathIndex=/data/rocketmq/store-s/index storeCheckpoint=/data/rocketmq/store-s/checkpoint abortFile=/data/rocketmq/store-s/abort maxMessageSize=65536 brokerRole=SLAVE flushDiskType=ASYNC_FLUSHCopy the code
3.192.168.1.2 Modifying the configuration file broker-a-S. propterties on the server
BrokerClusterName = Rocketmq-cluster # Broker name, Note brokerName= Broker - A #0 = Master, Greater than 0 means Slave brokerId=1 #nameServer address, semicolon split namesrvAddr= RocketMQ-N1:9876; DefaultTopicQueueNums =4 # Allow the Broker to automatically create topics. AutoCreateTopicEnable =true # Whether to allow the Broker to automatically create subscription groups. Production environment close autoCreateSubscriptionGroup = true # Broker foreign service listening on port listenPort = 11011 # delete files point in time, the default 4 am deleteWhen = 4 # document retention time, MapedFileSizeCommitLog =1073741824 #ConsumeQueue by default, 30W files are stored in each file. Adjust according to the business situation mapedFileSizeConsumeQueue = 120000 # 300000 # destroyMapedFileIntervalForcibly = redeleteHangedFileInterval = 120000 DiskMaxUsedSpaceRatio =88 storePathRootDir=/data/ Rocketmq /store-s #commitLog storage path StorePathCommitLog =/data/ RocketMQ /store-s/ COMMITlog # Consume queue storage path Storage path StorePathConsumeQueue =/data/ RocketMQ /store-s/consumequeue # storePathIndex=/data/ RocketMQ /store-s/index StoreCheckpoint =/data/ Rocketmq /store-s/checkpoint #abort the file storage path AbortFile =/data/ RocketMQ /store-s/abort # maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker Role #- ASYNC_MASTER Asynchronous replication Master #- SYNC_MASTER Synchronous double write Master #- SLAVE brokerRole=SLAVE # Flush Flush Asynchronous flush #- SYNC_FLUSH synchronous brush plate flushDiskType = # ASYNC_FLUSH checkTransactionMessageEnable = false number # # message thread pool sendMessageThreadPoolNums = 128 # message thread pool number # pullMessageThreadPoolNums = 128Copy the code
4.192.168.1.2 Modifying the configuration file broker-b.propterties on the server
BrokerClusterName = Rocketmq-cluster # Broker name, Note brokerName= Broker-B #0 = Master, A value greater than 0 means Slave brokerId=0 #nameServer address, semicolon split namesrvAddr= RocketMQ-node1:9876; DefaultTopicQueueNums =4 # Allow the Broker to automatically create topics. AutoCreateTopicEnable =true # Whether to allow the Broker to automatically create subscription groups. Production environment close autoCreateSubscriptionGroup = true # Broker foreign service listening on port listenPort = 10911 # delete files point in time, the default 4 am deleteWhen = 4 # document retention time, MapedFileSizeCommitLog =1073741824 #ConsumeQueue by default, 30W files are stored in each file. Adjust according to the business situation mapedFileSizeConsumeQueue = 120000 # 300000 # destroyMapedFileIntervalForcibly = redeleteHangedFileInterval = 120000 DiskMaxUsedSpaceRatio =88 storePathRootDir=/data/ Rocketmq /store #commitLog storage path StorePathCommitLog = / data/rocketmq/store/commitlog # consumption queue storage paths storage paths storePathConsumeQueue = / data/rocketmq/store/consumequeue # message index storage paths storePathIndex = / data/rocketmq/store/index # checkpoint file storage path storeCheckpoint = / data/rocketmq/store/checkpoint # abort abortFile file storage path = / data/rocketmq/store / # abort message limit maxMessageSize = 65536 # flushCommitLogLeastPages = 4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker Role #- ASYNC_MASTER Asynchronous copy Master #- SYNC_MASTER Synchronous double write Master #- SLAVE brokerRole=SYNC_MASTER # Flush Asynchronous flush #- SYNC_FLUSH synchronous brush plate flushDiskType = # SYNC_FLUSH checkTransactionMessageEnable = false number # # message thread pool sendMessageThreadPoolNums = 128 # message thread pool number # pullMessageThreadPoolNums = 128Copy the code
5. Start the cluster
# respectively on two virtual machine implementation 192.168.1.1 mkdir/data/rocketmq/store - p mkdir/data/rocketmq/store/commitlog -p mkdir / data/rocketmq/store/consumequeue -p mkdir/data/rocketmq/store/index - p # 192.168.1.1 mkdir/data/rocketmq/store - s - p mkdir /data/rocketmq/store-s/commitlog -p mkdir /data/rocketmq/store-s/consumequeue -p mkdir / data/index/rocketmq/store - s - p # stop broker namesrv CD/usr/local/rocketmq/conf / 2 m - 2 s - sync sh / usr/local/rocketmq/bin/mqshutdown broker sh/usr/local/rocketmq/bin/mqshutdown namesrv # execution nohup on 192.168.1.2 instead /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m- 2s-sync/broker-a.properties & nohup / usr/local/rocketmq/bin/mqbroker - c/usr/local/rocketmq/conf / 2 m - 2 s - sync/broker - b - supachai panitchpakdi roperties & # execution nohup on 192.168.1.2 instead /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m- 2s-sync/broker-b.properties & nohup / usr/local/rocketmq/bin/mqbroker - c/usr/local/rocketmq/conf / 2 m - 2 s - sync/broker - a - supachai panitchpakdi roperties & # to check the cluster information / usr/local/rocketmq/bin/mqadmin clusterlist -n 192.168.1.1:9876; 192.168.1.2 instead: 9876 # close sh/usr/local/rocketmq/bin/mqshutdown broker sh/usr/local/rocketmq/bin/mqshutdown namesrvCopy the code
6. RocketMQ Web interface
git clone https://github.com/apache/rocketmq-externals cd rocketmq-externals/rocketmq-console mvn clean package -dmaven.test. skip=true # Start nohup Java -jar rocketmq-console-ng-2.0.0.jar --server.port=8081 -- Rocketmq. Config. NamesrvAddr = 192.168.1.1:9876 \; 192.168.1.2 instead: 9876 > > / root/logs/rocketmqlogs/mq - the console. The log > & 1 & 2Copy the code
7. Send and receive messages
# into/usr/local/usr/local/rocketmq - all - 4.7.1 - release CD/usr/local/bin directory/usr/local/rocketmq - all - 4.7.1 - release/bin sh Mqadmin updateTopic -t TopicHello -n localhost:9876 -b localhost:10911 Org. Apache. Rocketmq. Example. The quickstart. Producer consumer message sh # use rocketmq start tools. Sh org.apache.rocketmq.example.quickstart.ConsumerCopy the code