🙂🙂🙂 follow wechat public number: [Taro’s back end hut] have benefits:
- RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
- RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
- Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
- New source code parsing articles are notified in real time. It’s updated about once a week.
- 1. An overview of the
- 2. Namesrv is highly available
- 2.1 Broker registered with Namesrv
- 2.2 Producer and Consumer access Namesrv
- 3. Broker is highly available
- 3.2 the Broker master-slave
- 3.1.1 configuration
- 3.1.2 components
- 3.1.3 Communication protocol
- 3.1.4 Slave
- 3.1.5 Master
- 3.1.6 Master_SYNC
- 3.2 Producer Sends Messages
- 3.3 Consumer messages
- 3.2 the Broker master-slave
- 4. To summarize
1. An overview of the
This article focuses on how Namesrv and Broker achieve high availability, and how Producer and Consumer communicate with them to ensure high availability.
2. Namesrv is highly available
Start multiple NamesRVs for high availability. Compared with Zookeeper, Consul, Etcd, Namesrv is an ultra-lightweight registry that provides naming services.
2.1 Broker registered with Namesrv
- 📌 Multiple NamesRVs do not have any relationship (such as the Leader and Follower roles of Zookeeper) and do not communicate with each other or synchronize data. Register multiple NamesRVs through a Broker loop.
1: // ⬇️⬇️⬇️【 brokerouterapi.java 】
2: public RegisterBrokerResult registerBrokerAll(
3: final String clusterName,
4: final String brokerAddr,
5: final String brokerName,
6: final long brokerId,
7: final String haServerAddr,
8: final TopicConfigSerializeWrapper topicConfigWrapper,
9: final List<String> filterServerList,
10: final boolean oneway,
11: final int timeoutMills) {
12: RegisterBrokerResult registerBrokerResult = null;
13:
14: List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
15: if(nameServerAddressList ! =null) {
16: for (String namesrvAddr : nameServerAddressList) { // Loop through multiple Namesrv
17: try {
18: RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
19: haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
20: if(result ! =null) {
21: registerBrokerResult = result;
22:}23:
24: log.info("register broker to name server {} OK", namesrvAddr);
25:}catch (Exception e) {
26: log.warn("registerBroker Exception, {}", namesrvAddr, e);
27:}28:}29:}30:
31: return registerBrokerResult;
32:}Copy the code
2.2 Producer and Consumer access Namesrv
- 📌 Producer and Consumer select a connectable from the Namesrv list to communicate with each other.
1: // ⬇️⬇️⬇️【 nettyremotingclient.java 】
2: private Channel getAndCreateNameserverChannel(a) throws InterruptedException {
3: // Return selected, connectable Namesrv
4: String addr = this.namesrvAddrChoosed.get();
5: if(addr ! =null) {
6: ChannelWrapper cw = this.channelTables.get(addr);
7: if(cw ! =null && cw.isOK()) {
8: return cw.getChannel();
9:}10:}11: //
12: final List<String> addrList = this.namesrvAddrList.get();
13: if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
14: try {
15: // Returns the selected, connectable Namesrv
16: addr = this.namesrvAddrChoosed.get();
17: if(addr ! =null) {
18: ChannelWrapper cw = this.channelTables.get(addr);
19: if(cw ! =null && cw.isOK()) {
20: return cw.getChannel();
21:}22:}23: // Select a connection return from the Namesrv list
24: if(addrList ! =null && !addrList.isEmpty()) {
25: for (int i = 0; i < addrList.size(); i++) {
26: int index = this.namesrvIndex.incrementAndGet();
27: index = Math.abs(index);
28: index = index % addrList.size();
29: String newAddr = addrList.get(index);
30:
31: this.namesrvAddrChoosed.set(newAddr);
32: Channel channelNew = this.createChannel(newAddr);
33: if(channelNew ! =null)
34: return channelNew;
35:}36:}37:}catch (Exception e) {
38: log.error("getAndCreateNameserverChannel: create name server channel exception", e);
39:}finally {
40: this.lockNamesrvChannel.unlock();
41:}42:}else {
43: log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
44:}45:
46: return null;
47:}Copy the code
3. Broker is highly available
Start groups of brokers to form clusters for high availability. Broker group = Master xN + Slave xN Similar to MySQL, the Master node provides read and write services, while the Slave node only provides read services.
3.2 the Broker master-slave
- For each group,
Master
The node keeps sending new onesCommitLog
给Slave
Node.Slave
The node continuously reports the local informationCommitLog
The location that has been synchronized toMaster
Node. The Broker grouping
与The Broker grouping
There is no relationship between, no communication and data synchronization.- Consumption schedule is currently not supported
Master
/Slave
Synchronization.
In a cluster, there are two types of Master nodes: Master_SYNC and Master_ASYNC. When the Producer sends messages, the former waits for the Slave node to complete the storage and then returns the sending result. The latter does not wait.
3.1.1 configuration
Currently, there are three official configurations:
- 2m-2s-async
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
- 2m-2s-sync
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | SYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | SYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
- 2m-noslave
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
3.1.2 components
Before we look at the implementation code, let’s look at the components that the Master/Slave node contains:
Master
nodeAcceptSocketService
: receiveSlave
Node connection.HAConnection
ReadSocketService
:readfromSlave
Node data.WriteSocketService
:writeTo goSlave
Node data.
Slave
nodeHAClient
:Master
Nodes connect and read and write data.
3.1.3 Communication protocol
The communication protocols of the Master node and Slave node are as follows:
object | use | How many bits | field | The data type | The number of bytes | instructions |
---|---|---|---|---|---|---|
Slave=>Master | Report CommitLogalreadySynchronized to thephysicallocation | |||||
0 | maxPhyOffset | Long | 8 | CommitLog Maximum physical location | ||
Master=>Slave | The new transmissionCommitLog data |
|||||
0 | fromPhyOffset | Long | 8 | CommitLog Start physical location | ||
1 | size | Int | 4 | CommitLog data transmission length | ||
2 | body | Bytes | size | Transfer CommitLog data |
3.1.4 Slave
- Slave The Slave Master loop, which continuously transfers CommitLog data from the Master and uploads the physical location of the Master’s local CommitLog.
1: // ⬇️⬇️⬇️【 haclient.java 】
2: public void run(a) {
3: log.info(this.getServiceName() + " service started");
4:
5: while (!this.isStopped()) {
6: try {
7: if (this.connectMaster()) {
8: // If the reporting interval is met, the Master progress is reported
9: if (this.isTimeToReportOffset()) {
10: boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
11: if(! result) {12: this.closeMaster();
13:}14:}15:
16: this.selector.select(1000);
17:
18: // Handle read events
19: boolean ok = this.processReadEvent();
20: if(! ok) {21: this.closeMaster();
22:}23:
24: // If the progress changes, the progress is reported to the Master
25: if(! reportSlaveMaxOffsetPlus()) {26: continue;
27:}28:
29: // The Master does not return data for a long time
30: long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
31: if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
32: .getHaHousekeepingInterval()) {
33: log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
34: + "] expired, " + interval);
35: this.closeMaster();
36: log.warn("HAClient, master not response some time, so close connection");
37:}38:}else {
39: this.waitForRunning(1000 * 5);
40:}41:}catch (Exception e) {
42: log.warn(this.getServiceName() + " service has exception. ", e);
43: this.waitForRunning(1000 * 5);
44:}45:}46:
47: log.info(this.getServiceName() + " service end");
48:}Copy the code
- Lines 8 to 14:Fixed interval (5s by default)to
Master
reportSlave
localCommitLog
Physical location that has been synchronized. This operation also hasThe heartbeatThe role of. - Lines 16 to 22: Processing
Master
transmissionSlave
的CommitLog
The data.
- Let’s see
#dispatchReadRequest(...)
与#reportSlaveMaxOffset(...)
How it’s done.
1: / / 【 HAClient. Java 】
2: /** 2: /* 3: /* 3: /* 4: /* 4: / * 1. The data offset transmitted by the Master is not equal to the maximum offset of the Slave CommitLog data. Description Failed to report the progress to the Master. 8: * 9: *@return10: */
11: private boolean dispatchReadRequest(a) {
12: final int msgHeaderSize = 8 + 4; // phyoffset + size
13: int readSocketPos = this.byteBufferRead.position();
14:
15: while (true) {
16: // The request is read
17: int diff = this.byteBufferRead.position() - this.dispatchPostion;
18: if (diff >= msgHeaderSize) {
19: // Read masterPhyOffset, bodySize. The reason for using dispatchPostion is that "sticky packets" are processed, resulting in incomplete data reads.
20: long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
21: int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
22: // Check whether the data offset transmitted by the Master is the same as the maximum offset of the Slave CommitLog data.
23: long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
24: if(slavePhyOffset ! =0) {
25: if(slavePhyOffset ! = masterPhyOffset) {26: log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
27: + slavePhyOffset + " MASTER: " + masterPhyOffset);
28: return false;
29:}30:}31: // The message is read
32: if (diff >= (msgHeaderSize + bodySize)) {
33: / / write CommitLog
34: byte[] bodyData = new byte[bodySize];
35: this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
36: this.byteBufferRead.get(bodyData);
37: HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
38: // Set the location to be processed
39: this.byteBufferRead.position(readSocketPos);
40: this.dispatchPostion += msgHeaderSize + bodySize;
41: // Report progress to Master
42: if(! reportSlaveMaxOffsetPlus()) {43: return false;
44:}45: // Continue the loop
46: continue;
47:}48:}49:
50: // The space is full, and the space is reallocated
51: if (!this.byteBufferRead.hasRemaining()) {
52: this.reallocateByteBuffer();
53:}54:
55: break;
56:}57:
58: return true;
59:}60:
61: /** 62: * Reporting progress 63: * 64: *@paramMaxOffset Progress 65: *@returnCheck whether the report is successful. 66: */
67: private boolean reportSlaveMaxOffset(final long maxOffset) {
68: this.reportOffset.position(0);
69: this.reportOffset.limit(8);
70: this.reportOffset.putLong(maxOffset);
71: this.reportOffset.position(0);
72: this.reportOffset.limit(8);
73:
74: for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
75: try {
76: this.socketChannel.write(this.reportOffset);
77:}catch (IOException e) {
78: log.error(this.getServiceName()
79: + "reportSlaveMaxOffset this.socketChannel.write exception", e);
80: return false;
81:}82:}83:
84: return !this.reportOffset.hasRemaining();
85:}Copy the code
3.1.5 Master
ReadSocketService
Logic withHAClient#processReadEvent(...)
Pretty much the same. Let’s just look at the code.
1: // ⬇️⬇️⬇️ readSocketservice.java
2: private boolean processReadEvent(a) {
3: int readSizeZeroTimes = 0;
4:
5: / / empty byteBufferRead
6: if (!this.byteBufferRead.hasRemaining()) {
7: this.byteBufferRead.flip();
8: this.processPostion = 0;
9:}10:
11: while (this.byteBufferRead.hasRemaining()) {
12: try {
13: int readSize = this.socketChannel.read(this.byteBufferRead);
14: if (readSize > 0) {
15: readSizeZeroTimes = 0;
16:
17: // Set the last read time
18: this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
19:
20: if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
21: // Read the maximum CommitLog location from the Slave request
22: int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
23: long readOffset = this.byteBufferRead.getLong(pos - 8);
24: this.processPostion = pos;
25:
26: // Set the maximum location of the Slave CommitLog
27: HAConnection.this.slaveAckOffset = readOffset;
28:
29: // Sets the location of the first Slave request
30: if (HAConnection.this.slaveRequestOffset < 0) {
31: HAConnection.this.slaveRequestOffset = readOffset;
32: log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
33:}34:
35: // Notify current Slave progress. This mode is used when the Master node is of the synchronous type.
36: HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
37:}38:}else if (readSize == 0) {
39: if (++readSizeZeroTimes >= 3) {
40: break;
41:}42:}else {
43: log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
44: return false;
45:}46:}catch (IOException e) {
47: log.error("processReadEvent exception", e);
48: return false;
49:}50:}51:
52: return true;
53:}Copy the code
WriteSocketService
To calculateSlave
Start synchronizing the position after the continuous directionSlave
The new transmissionCommitLog
The data.
1: // ⬇️⬇️⬇️【 writesocketservice.java 】
2: @Override
3: public void run(a) {
4: HAConnection.log.info(this.getServiceName() + " service started");
5:
6: while (!this.isStopped()) {
7: try {
8: this.selector.select(1000);
9:
10: // Failed to get the Slave read progress request.
11: if (-1 == HAConnection.this.slaveRequestOffset) {
12: Thread.sleep(10);
13: continue;
14:}15:
16: // Compute to initialize nextTransferFromWhere
17: if (-1= =this.nextTransferFromWhere) {
18: if (0 == HAConnection.this.slaveRequestOffset) {
19: long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
20: masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
21: if (masterOffset < 0) {
22: masterOffset = 0;
23:}24:
25: this.nextTransferFromWhere = masterOffset;
26:}else {
27: this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
28:}29:
30: log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
31: + "], and slave request " + HAConnection.this.slaveRequestOffset);
32:}33:
34: if (this.lastWriteOver) {
35: long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
36: if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { / / the heart
37:
38: // Build Header
39: this.byteBufferHeader.position(0);
40: this.byteBufferHeader.limit(headerSize);
41: this.byteBufferHeader.putLong(this.nextTransferFromWhere);
42: this.byteBufferHeader.putInt(0);
43: this.byteBufferHeader.flip();
44:
45: this.lastWriteOver = this.transferData();
46: if (!this.lastWriteOver)
47: continue;
48:}49:}else { // Continue transmission until transmission is complete
50: this.lastWriteOver = this.transferData();
51: if (!this.lastWriteOver)
52: continue;
53:}54:
55: // Select new CommitLog data for transmission
56: SelectMappedBufferResult selectResult =
57: HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
58: if(selectResult ! =null) {
59: int size = selectResult.getSize();
60: if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
61: size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
62:}63:
64: long thisOffset = this.nextTransferFromWhere;
65: this.nextTransferFromWhere += size;
66:
67: selectResult.getByteBuffer().limit(size);
68: this.selectMappedBufferResult = selectResult;
69:
70: // Build Header
71: this.byteBufferHeader.position(0);
72: this.byteBufferHeader.limit(headerSize);
73: this.byteBufferHeader.putLong(thisOffset);
74: this.byteBufferHeader.putInt(size);
75: this.byteBufferHeader.flip();
76:
77: this.lastWriteOver = this.transferData();
78:}else { // No new message, suspend and wait
79: HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
80:}81:}catch (Exception e) {
82:
83: HAConnection.log.error(this.getServiceName() + " service has exception.", e);
84: break;
85:}86:}87:
88: // Disconnect & suspend the writer thread & Suspend the reader thread & release the CommitLog
89: if (this.selectMappedBufferResult ! =null) {
90: this.selectMappedBufferResult.release();
91:}92:
93: this.makeStop();
94:
95: readSocketService.makeStop();
96:
97: haService.removeConnection(HAConnection.this);
98:
99: SelectionKey sk = this.socketChannel.keyFor(this.selector);
100: if(sk ! =null) {
101: sk.cancel();
102:}103:
104: try {
105: this.selector.close();
106: this.socketChannel.close();
107:}catch (IOException e) {
108: HAConnection.log.error("", e);
109:}110:
111: HAConnection.log.info(this.getServiceName() + " service end");
112:}113:
114: /** 115: * Transfer data 116: */
117: private boolean transferData(a) throws Exception {
118: int writeSizeZeroTimes = 0;
119: // Write Header
120: while (this.byteBufferHeader.hasRemaining()) {
121: int writeSize = this.socketChannel.write(this.byteBufferHeader);
122: if (writeSize > 0) {
123: writeSizeZeroTimes = 0;
124: this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
125:}else if (writeSize == 0) {
126: if (++writeSizeZeroTimes >= 3) {
127: break;
128:}129:}else {
130: throw new Exception("ha master write header error < 0");
131:}132:}133:
134: if (null= =this.selectMappedBufferResult) {
135: return !this.byteBufferHeader.hasRemaining();
136:}137:
138: writeSizeZeroTimes = 0;
139:
140: // Write Body
141: if (!this.byteBufferHeader.hasRemaining()) {
142: while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
143: int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
144: if (writeSize > 0) {
145: writeSizeZeroTimes = 0;
146: this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
147:}else if (writeSize == 0) {
148: if (++writeSizeZeroTimes >= 3) {
149: break;
150:}151:}else {
152: throw new Exception("ha master write body error < 0");
153:}154:}155:}156:
157: boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
158:
159: if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
160: this.selectMappedBufferResult.release();
161: this.selectMappedBufferResult = null;
162:}163:
164: return result;
165:}Copy the code
3.1.6 Master_SYNC
Producer
When you send a message,Master_SYNC
The node will waitSlave
The node returns the result after the storage is complete.
The core code is as follows:
1: // ⬇️⬇️⬇️【 commitlog.java 】
2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
3: / /... Omit processing send code
4: // Synchronous write double In the case of synchronizing Master, synchronize to the slave node
5: if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
6: HAService service = this.defaultMessageStore.getHaService();
7: if (msg.isWaitStoreMsgOK()) {
8: // Determine whether to wait
9: if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
10: if (null == request) {
11: request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
12:}13: service.putRequest(request);
14:
15: / / wake WriteSocketService
16: service.getWaitNotifyObject().wakeupAll();
17:
18: boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
19: if(! flushOK) {20: log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
21: + msg.getTags() + " client address: " + msg.getBornHostString());
22: putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
23:}24:}25: // Slave problem
26: else {
27: // Tell the producer, slave not available
28: putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
29:}30:}31:}32:
33: return putMessageResult;
34:}Copy the code
- Line 16: Wake up
WriteSocketService
.- After wake up,
WriteSocketService
Suspend to wait for a new message to end,Master
transmissionSlave
The newCommitLog
The data. Slave
After receiving the data,immediatelyUp to dateCommitLog
Synchronize progress toMaster
.ReadSocketService
Wake up theLine 18:request#waitForFlush(...)
.
- After wake up,
Let’s look at the core logic code of GroupTransferService:
1: / / ⬇ ️ ⬇ ️ ⬇ ️ GroupTransferService. Java 】 【
2: private void doWaitTransfer(a) {
3: synchronized (this.requestsRead) {
4: if (!this.requestsRead.isEmpty()) {
5: for (CommitLog.GroupCommitRequest req : this.requestsRead) {
6: // Wait for the Slave upload progress
7: boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
8: for (int i = 0; ! transferOK && i <5; i++) {
9: this.notifyTransferObject.waitForRunning(1000); / / wake
10: transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
11:}12:
13: if(! transferOK) {14: log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
15:}16:
17: // Wake up the request and set whether the Slave is successfully synchronized
18: req.wakeupCustomer(transferOK);
19:}20:
21: this.requestsRead.clear();
22:}23:}24:}Copy the code
3.2 Producer Sends Messages
Producer
When a message is sent, theBroker
Select all queues in the cluster.
The core code is as follows:
1: / / ⬇ ️ ⬇ ️ ⬇ ️ DefaultMQProducerImpl. Java 】 【
2: private SendResult sendDefaultImpl(/ /3: Message msg, //
4: final CommunicationMode communicationMode, //
5: final SendCallback sendCallback, //
6: final long timeout//
7:) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
8: / /... 3. To process the verification logic.
9: // Get Topic routing information
10: TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
11: if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
12: MessageQueue mq = null; // Finally select the queue to which the message will be sent
13: Exception exception = null;
14: SendResult sendResult = null; // Send the result for the last time
15: int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // Synchronize multiple calls
16: int times = 0; // The number of times to send
17: String[] brokersSent = new String[timesTotal]; // Stores the broker name selected each time a message is sent
18: // Loop calls to send messages until success
19: for (; times < timesTotal; times++) {
20: String lastBrokerName = null == mq ? null : mq.getBrokerName();
21: MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // Select the queue to which the message is sent
22: if(tmpmq ! =null) {
23: mq = tmpmq;
24: brokersSent[times] = mq.getBrokerName();
25: try {
26: beginTimestampPrev = System.currentTimeMillis();
27: // Call the send message core method
28: sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
29: endTimestamp = System.currentTimeMillis();
30: // Update Broker availability information
31: this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
32: / /... 7. To process the sending of returned results.
33:}34:}catch (e) { / /... 3. To handle an exception.
35:
36:}37:}else {
38: break;
39:}40:}41: / /... 7. To process the sending of returned results.
42:}43: / /... Omitted: Processing [message route not found]
44:}Copy the code
Debug #sendDefaultImpl(…) As a result of TopicPublishInfo, Producer obtains the message queues of broker-A and Broker-B:
3.3 Consumer messages
Consumer
When consuming messages, will be onBroker
Select all queues in the cluster.