🙂🙂🙂 follow wechat public number: [Taro’s back end hut] have benefits:

  1. RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
  2. RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
  3. Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
  4. New source code parsing articles are notified in real time. It’s updated about once a week.

  • 1. An overview of the
  • 2. Namesrv is highly available
    • 2.1 Broker registered with Namesrv
    • 2.2 Producer and Consumer access Namesrv
  • 3. Broker is highly available
    • 3.2 the Broker master-slave
      • 3.1.1 configuration
      • 3.1.2 components
      • 3.1.3 Communication protocol
      • 3.1.4 Slave
      • 3.1.5 Master
      • 3.1.6 Master_SYNC
    • 3.2 Producer Sends Messages
    • 3.3 Consumer messages
  • 4. To summarize

1. An overview of the

This article focuses on how Namesrv and Broker achieve high availability, and how Producer and Consumer communicate with them to ensure high availability.

2. Namesrv is highly available

Start multiple NamesRVs for high availability. Compared with Zookeeper, Consul, Etcd, Namesrv is an ultra-lightweight registry that provides naming services.

2.1 Broker registered with Namesrv

  • 📌 Multiple NamesRVs do not have any relationship (such as the Leader and Follower roles of Zookeeper) and do not communicate with each other or synchronize data. Register multiple NamesRVs through a Broker loop.
  1: // ⬇️⬇️⬇️【 brokerouterapi.java 】
  2: public RegisterBrokerResult registerBrokerAll(
  3:     final String clusterName,
  4:     final String brokerAddr,
  5:     final String brokerName,
  6:     final long brokerId,
  7:     final String haServerAddr,
  8:     final TopicConfigSerializeWrapper topicConfigWrapper,
  9:     final List<String> filterServerList,
 10:     final boolean oneway,
 11:     final int timeoutMills) {
 12:     RegisterBrokerResult registerBrokerResult = null;
 13: 
 14:     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
 15:     if(nameServerAddressList ! =null) {
 16:         for (String namesrvAddr : nameServerAddressList) { // Loop through multiple Namesrv
 17:             try {
 18:                 RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
 19:                     haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
 20:                 if(result ! =null) {
 21:                     registerBrokerResult = result;
 22:}23: 
 24:                 log.info("register broker to name server {} OK", namesrvAddr);
 25:}catch (Exception e) {
 26:                 log.warn("registerBroker Exception, {}", namesrvAddr, e);
 27:}28:}29:}30: 
 31:     return registerBrokerResult;
 32:}Copy the code

2.2 Producer and Consumer access Namesrv

  • 📌 Producer and Consumer select a connectable from the Namesrv list to communicate with each other.
  1: // ⬇️⬇️⬇️【 nettyremotingclient.java 】
  2: private Channel getAndCreateNameserverChannel(a) throws InterruptedException {
  3:     // Return selected, connectable Namesrv
  4:     String addr = this.namesrvAddrChoosed.get();
  5:     if(addr ! =null) {
  6:         ChannelWrapper cw = this.channelTables.get(addr);
  7:         if(cw ! =null && cw.isOK()) {
  8:             return cw.getChannel();
  9:}10:}11:     //
 12:     final List<String> addrList = this.namesrvAddrList.get();
 13:     if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
 14:         try {
 15:             // Returns the selected, connectable Namesrv
 16:             addr = this.namesrvAddrChoosed.get();
 17:             if(addr ! =null) {
 18:                 ChannelWrapper cw = this.channelTables.get(addr);
 19:                 if(cw ! =null && cw.isOK()) {
 20:                     return cw.getChannel();
 21:}22:}23:             // Select a connection return from the Namesrv list
 24:             if(addrList ! =null && !addrList.isEmpty()) {
 25:                 for (int i = 0; i < addrList.size(); i++) {
 26:                     int index = this.namesrvIndex.incrementAndGet();
 27:                     index = Math.abs(index);
 28:                     index = index % addrList.size();
 29:                     String newAddr = addrList.get(index);
 30: 
 31:                     this.namesrvAddrChoosed.set(newAddr);
 32:                     Channel channelNew = this.createChannel(newAddr);
 33:                     if(channelNew ! =null)
 34:                         return channelNew;
 35:}36:}37:}catch (Exception e) {
 38:             log.error("getAndCreateNameserverChannel: create name server channel exception", e);
 39:}finally {
 40:             this.lockNamesrvChannel.unlock();
 41:}42:}else {
 43:         log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
 44:}45: 
 46:     return null;
 47:}Copy the code

3. Broker is highly available

Start groups of brokers to form clusters for high availability. Broker group = Master xN + Slave xN Similar to MySQL, the Master node provides read and write services, while the Slave node only provides read services.

3.2 the Broker master-slave

  • For each group,MasterThe node keeps sending new onesCommitLogSlaveNode.SlaveThe node continuously reports the local informationCommitLogThe location that has been synchronized toMasterNode.
  • The Broker groupingThe Broker groupingThere is no relationship between, no communication and data synchronization.
  • Consumption schedule is currently not supportedMaster/SlaveSynchronization.

In a cluster, there are two types of Master nodes: Master_SYNC and Master_ASYNC. When the Producer sends messages, the former waits for the Slave node to complete the storage and then returns the sending result. The latter does not wait.

3.1.1 configuration

Currently, there are three official configurations:

  • 2m-2s-async
brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a ASYNC_MASTER 0
DefaultCluster broker-a SLAVE 1
DefaultCluster broker-b ASYNC_MASTER 0
DefaultCluster broker-b SLAVE 1
  • 2m-2s-sync
brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a SYNC_MASTER 0
DefaultCluster broker-a SLAVE 1
DefaultCluster broker-b SYNC_MASTER 0
DefaultCluster broker-b SLAVE 1
  • 2m-noslave
brokerClusterName brokerName brokerRole brokerId
DefaultCluster broker-a ASYNC_MASTER 0
DefaultCluster broker-b ASYNC_MASTER 0

3.1.2 components

Before we look at the implementation code, let’s look at the components that the Master/Slave node contains:

PNG image of HA components

  • Masternode
    • AcceptSocketService: receiveSlaveNode connection.
    • HAConnection
      • ReadSocketServicereadfromSlaveNode data.
      • WriteSocketServicewriteTo goSlaveNode data.
  • Slavenode
    • HAClient:MasterNodes connect and read and write data.

3.1.3 Communication protocol

The communication protocols of the Master node and Slave node are as follows:

object use How many bits field The data type The number of bytes instructions
Slave=>Master Report CommitLogalreadySynchronized to thephysicallocation
0 maxPhyOffset Long 8 CommitLog Maximum physical location
Master=>Slave The new transmissionCommitLogdata
0 fromPhyOffset Long 8 CommitLog Start physical location
1 size Int 4 CommitLog data transmission length
2 body Bytes size Transfer CommitLog data

3.1.4 Slave

HAClient sequence diagram


  • Slave The Slave Master loop, which continuously transfers CommitLog data from the Master and uploads the physical location of the Master’s local CommitLog.
  1: // ⬇️⬇️⬇️【 haclient.java 】
  2: public void run(a) {
  3:     log.info(this.getServiceName() + " service started");
  4: 
  5:     while (!this.isStopped()) {
  6:         try {
  7:             if (this.connectMaster()) {
  8:                 // If the reporting interval is met, the Master progress is reported
  9:                 if (this.isTimeToReportOffset()) {
 10:                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
 11:                     if(! result) {12:                         this.closeMaster();
 13:}14:}15: 
 16:                 this.selector.select(1000);
 17: 
 18:                 // Handle read events
 19:                 boolean ok = this.processReadEvent();
 20:                 if(! ok) {21:                     this.closeMaster();
 22:}23: 
 24:                 // If the progress changes, the progress is reported to the Master
 25:                 if(! reportSlaveMaxOffsetPlus()) {26:                     continue;
 27:}28: 
 29:                 // The Master does not return data for a long time
 30:                 long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
 31:                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
 32:                     .getHaHousekeepingInterval()) {
 33:                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
 34:                         + "] expired, " + interval);
 35:                     this.closeMaster();
 36:                     log.warn("HAClient, master not response some time, so close connection");
 37:}38:}else {
 39:                 this.waitForRunning(1000 * 5);
 40:}41:}catch (Exception e) {
 42:             log.warn(this.getServiceName() + " service has exception. ", e);
 43:             this.waitForRunning(1000 * 5);
 44:}45:}46: 
 47:     log.info(this.getServiceName() + " service end");
 48:}Copy the code
  • Lines 8 to 14:Fixed interval (5s by default)toMasterreportSlavelocalCommitLogPhysical location that has been synchronized. This operation also hasThe heartbeatThe role of.
  • Lines 16 to 22: ProcessingMastertransmissionSlaveCommitLogThe data.

  • Let’s see#dispatchReadRequest(...)#reportSlaveMaxOffset(...)How it’s done.
  1: / / 【 HAClient. Java 】
  2: /** 2: /* 3: /* 3: /* 4: /* 4: / * 1. The data offset transmitted by the Master is not equal to the maximum offset of the Slave CommitLog data. Description Failed to report the progress to the Master. 8: * 9: *@return10: */
 11: private boolean dispatchReadRequest(a) {
 12:     final int msgHeaderSize = 8 + 4; // phyoffset + size
 13:     int readSocketPos = this.byteBufferRead.position();
 14: 
 15:     while (true) {
 16:         // The request is read
 17:         int diff = this.byteBufferRead.position() - this.dispatchPostion;
 18:         if (diff >= msgHeaderSize) {
 19:             // Read masterPhyOffset, bodySize. The reason for using dispatchPostion is that "sticky packets" are processed, resulting in incomplete data reads.
 20:             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
 21:             int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
 22:             // Check whether the data offset transmitted by the Master is the same as the maximum offset of the Slave CommitLog data.
 23:             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
 24:             if(slavePhyOffset ! =0) {
 25:                 if(slavePhyOffset ! = masterPhyOffset) {26:                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
 27:                         + slavePhyOffset + " MASTER: " + masterPhyOffset);
 28:                     return false;
 29:}30:}31:             // The message is read
 32:             if (diff >= (msgHeaderSize + bodySize)) {
 33:                 / / write CommitLog
 34:                 byte[] bodyData = new byte[bodySize];
 35:                 this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
 36:                 this.byteBufferRead.get(bodyData);
 37:                 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
 38:                 // Set the location to be processed
 39:                 this.byteBufferRead.position(readSocketPos);
 40:                 this.dispatchPostion += msgHeaderSize + bodySize;
 41:                 // Report progress to Master
 42:                 if(! reportSlaveMaxOffsetPlus()) {43:                     return false;
 44:}45:                 // Continue the loop
 46:                 continue;
 47:}48:}49: 
 50:         // The space is full, and the space is reallocated
 51:         if (!this.byteBufferRead.hasRemaining()) {
 52:             this.reallocateByteBuffer();
 53:}54: 
 55:         break;
 56:}57: 
 58:     return true;
 59:}60: 
 61: /** 62: * Reporting progress 63: * 64: *@paramMaxOffset Progress 65: *@returnCheck whether the report is successful. 66: */
 67: private boolean reportSlaveMaxOffset(final long maxOffset) {
 68:     this.reportOffset.position(0);
 69:     this.reportOffset.limit(8);
 70:     this.reportOffset.putLong(maxOffset);
 71:     this.reportOffset.position(0);
 72:     this.reportOffset.limit(8);
 73: 
 74:     for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
 75:         try {
 76:             this.socketChannel.write(this.reportOffset);
 77:}catch (IOException e) {
 78:             log.error(this.getServiceName()
 79:                 + "reportSlaveMaxOffset this.socketChannel.write exception", e);
 80:             return false;
 81:}82:}83: 
 84:     return !this.reportOffset.hasRemaining();
 85:}Copy the code

3.1.5 Master

  • ReadSocketServiceLogic withHAClient#processReadEvent(...)Pretty much the same. Let’s just look at the code.
  1: // ⬇️⬇️⬇️ readSocketservice.java
  2: private boolean processReadEvent(a) {
  3:     int readSizeZeroTimes = 0;
  4: 
  5:     / / empty byteBufferRead
  6:     if (!this.byteBufferRead.hasRemaining()) {
  7:         this.byteBufferRead.flip();
  8:         this.processPostion = 0;
  9:}10: 
 11:     while (this.byteBufferRead.hasRemaining()) {
 12:         try {
 13:             int readSize = this.socketChannel.read(this.byteBufferRead);
 14:             if (readSize > 0) {
 15:                 readSizeZeroTimes = 0;
 16: 
 17:                 // Set the last read time
 18:                 this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
 19: 
 20:                 if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
 21:                     // Read the maximum CommitLog location from the Slave request
 22:                     int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
 23:                     long readOffset = this.byteBufferRead.getLong(pos - 8);
 24:                     this.processPostion = pos;
 25: 
 26:                     // Set the maximum location of the Slave CommitLog
 27:                     HAConnection.this.slaveAckOffset = readOffset;
 28: 
 29:                     // Sets the location of the first Slave request
 30:                     if (HAConnection.this.slaveRequestOffset < 0) {
 31:                         HAConnection.this.slaveRequestOffset = readOffset;
 32:                         log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
 33:}34: 
 35:                     // Notify current Slave progress. This mode is used when the Master node is of the synchronous type.
 36:                     HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
 37:}38:}else if (readSize == 0) {
 39:                 if (++readSizeZeroTimes >= 3) {
 40:                     break;
 41:}42:}else {
 43:                 log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
 44:                 return false;
 45:}46:}catch (IOException e) {
 47:             log.error("processReadEvent exception", e);
 48:             return false;
 49:}50:}51: 
 52:     return true;
 53:}Copy the code

  • WriteSocketServiceTo calculateSlaveStart synchronizing the position after the continuous directionSlaveThe new transmissionCommitLogThe data.
HA. WriteSocketService flow chart

  1: // ⬇️⬇️⬇️【 writesocketservice.java 】
  2: @Override
  3: public void run(a) {
  4:     HAConnection.log.info(this.getServiceName() + " service started");
  5: 
  6:     while (!this.isStopped()) {
  7:         try {
  8:             this.selector.select(1000);
  9: 
 10:             // Failed to get the Slave read progress request.
 11:             if (-1 == HAConnection.this.slaveRequestOffset) {
 12:                 Thread.sleep(10);
 13:                 continue;
 14:}15: 
 16:             // Compute to initialize nextTransferFromWhere
 17:             if (-1= =this.nextTransferFromWhere) {
 18:                 if (0 == HAConnection.this.slaveRequestOffset) {
 19:                     long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
 20:                     masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
 21:                     if (masterOffset < 0) {
 22:                         masterOffset = 0;
 23:}24: 
 25:                     this.nextTransferFromWhere = masterOffset;
 26:}else {
 27:                     this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
 28:}29: 
 30:                 log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
 31:                     + "], and slave request " + HAConnection.this.slaveRequestOffset);
 32:}33: 
 34:             if (this.lastWriteOver) {
 35:                 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
 36:                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { / / the heart
 37: 
 38:                     // Build Header
 39:                     this.byteBufferHeader.position(0);
 40:                     this.byteBufferHeader.limit(headerSize);
 41:                     this.byteBufferHeader.putLong(this.nextTransferFromWhere);
 42:                     this.byteBufferHeader.putInt(0);
 43:                     this.byteBufferHeader.flip();
 44: 
 45:                     this.lastWriteOver = this.transferData();
 46:                     if (!this.lastWriteOver)
 47:                         continue;
 48:}49:}else { // Continue transmission until transmission is complete
 50:                 this.lastWriteOver = this.transferData();
 51:                 if (!this.lastWriteOver)
 52:                     continue;
 53:}54: 
 55:             // Select new CommitLog data for transmission
 56:             SelectMappedBufferResult selectResult =
 57:                 HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
 58:             if(selectResult ! =null) {
 59:                 int size = selectResult.getSize();
 60:                 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
 61:                     size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
 62:}63: 
 64:                 long thisOffset = this.nextTransferFromWhere;
 65:                 this.nextTransferFromWhere += size;
 66: 
 67:                 selectResult.getByteBuffer().limit(size);
 68:                 this.selectMappedBufferResult = selectResult;
 69: 
 70:                 // Build Header
 71:                 this.byteBufferHeader.position(0);
 72:                 this.byteBufferHeader.limit(headerSize);
 73:                 this.byteBufferHeader.putLong(thisOffset);
 74:                 this.byteBufferHeader.putInt(size);
 75:                 this.byteBufferHeader.flip();
 76: 
 77:                 this.lastWriteOver = this.transferData();
 78:}else { // No new message, suspend and wait
 79:                 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
 80:}81:}catch (Exception e) {
 82: 
 83:             HAConnection.log.error(this.getServiceName() + " service has exception.", e);
 84:             break;
 85:}86:}87: 
 88:     // Disconnect & suspend the writer thread & Suspend the reader thread & release the CommitLog
 89:     if (this.selectMappedBufferResult ! =null) {
 90:         this.selectMappedBufferResult.release();
 91:}92: 
 93:     this.makeStop();
 94: 
 95:     readSocketService.makeStop();
 96: 
 97:     haService.removeConnection(HAConnection.this);
 98: 
 99:     SelectionKey sk = this.socketChannel.keyFor(this.selector);
100:     if(sk ! =null) {
101:         sk.cancel();
102:}103: 
104:     try {
105:         this.selector.close();
106:         this.socketChannel.close();
107:}catch (IOException e) {
108:         HAConnection.log.error("", e);
109:}110: 
111:     HAConnection.log.info(this.getServiceName() + " service end");
112:}113: 
114: /** 115: * Transfer data 116: */
117: private boolean transferData(a) throws Exception {
118:     int writeSizeZeroTimes = 0;
119:     // Write Header
120:     while (this.byteBufferHeader.hasRemaining()) {
121:         int writeSize = this.socketChannel.write(this.byteBufferHeader);
122:         if (writeSize > 0) {
123:             writeSizeZeroTimes = 0;
124:             this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
125:}else if (writeSize == 0) {
126:             if (++writeSizeZeroTimes >= 3) {
127:                 break;
128:}129:}else {
130:             throw new Exception("ha master write header error < 0");
131:}132:}133: 
134:     if (null= =this.selectMappedBufferResult) {
135:         return !this.byteBufferHeader.hasRemaining();
136:}137: 
138:     writeSizeZeroTimes = 0;
139: 
140:     // Write Body
141:     if (!this.byteBufferHeader.hasRemaining()) {
142:         while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
143:             int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
144:             if (writeSize > 0) {
145:                 writeSizeZeroTimes = 0;
146:                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
147:}else if (writeSize == 0) {
148:                 if (++writeSizeZeroTimes >= 3) {
149:                     break;
150:}151:}else {
152:                 throw new Exception("ha master write body error < 0");
153:}154:}155:}156: 
157:     boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
158: 
159:     if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
160:         this.selectMappedBufferResult.release();
161:         this.selectMappedBufferResult = null;
162:}163: 
164:     return result;
165:}Copy the code

3.1.6 Master_SYNC

  • ProducerWhen you send a message,Master_SYNCThe node will waitSlaveThe node returns the result after the storage is complete.

The core code is as follows:

  1: // ⬇️⬇️⬇️【 commitlog.java 】
  2: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
  3:     / /... Omit processing send code
  4:     // Synchronous write double In the case of synchronizing Master, synchronize to the slave node
  5:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
  6:         HAService service = this.defaultMessageStore.getHaService();
  7:         if (msg.isWaitStoreMsgOK()) {
  8:             // Determine whether to wait
  9:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
 10:                 if (null == request) {
 11:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 12:}13:                 service.putRequest(request);
 14: 
 15:                 / / wake WriteSocketService
 16:                 service.getWaitNotifyObject().wakeupAll();
 17: 
 18:                 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
 19:                 if(! flushOK) {20:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
 21:                         + msg.getTags() + " client address: " + msg.getBornHostString());
 22:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
 23:}24:}25:             // Slave problem
 26:             else {
 27:                 // Tell the producer, slave not available
 28:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
 29:}30:}31:}32: 
 33:     return putMessageResult;
 34:}Copy the code
  • Line 16: Wake upWriteSocketService.
    • After wake up,WriteSocketServiceSuspend to wait for a new message to end,MastertransmissionSlaveThe newCommitLogThe data.
    • SlaveAfter receiving the data,immediatelyUp to dateCommitLogSynchronize progress toMaster.ReadSocketServiceWake up theLine 18:request#waitForFlush(...).

Let’s look at the core logic code of GroupTransferService:

  1: / / ⬇ ️ ⬇ ️ ⬇ ️ GroupTransferService. Java 】 【
  2: private void doWaitTransfer(a) {
  3:     synchronized (this.requestsRead) {
  4:         if (!this.requestsRead.isEmpty()) {
  5:             for (CommitLog.GroupCommitRequest req : this.requestsRead) {
  6:                 // Wait for the Slave upload progress
  7:                 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
  8:                 for (int i = 0; ! transferOK && i <5; i++) {
  9:                     this.notifyTransferObject.waitForRunning(1000); / / wake
 10:                     transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
 11:}12: 
 13:                 if(! transferOK) {14:                     log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
 15:}16: 
 17:                 // Wake up the request and set whether the Slave is successfully synchronized
 18:                 req.wakeupCustomer(transferOK);
 19:}20: 
 21:             this.requestsRead.clear();
 22:}23:}24:}Copy the code

3.2 Producer Sends Messages

  • ProducerWhen a message is sent, theBrokerSelect all queues in the cluster.

The core code is as follows:

  1: / / ⬇ ️ ⬇ ️ ⬇ ️ DefaultMQProducerImpl. Java 】 【
  2: private SendResult sendDefaultImpl(/ /3:     Message msg, //
  4:     final CommunicationMode communicationMode, //
  5:     final SendCallback sendCallback, //
  6:     final long timeout//
  7:) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8:     / /... 3. To process the verification logic.
  9:     // Get Topic routing information
 10:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
 11:     if(topicPublishInfo ! =null && topicPublishInfo.ok()) {
 12:         MessageQueue mq = null; // Finally select the queue to which the message will be sent
 13:         Exception exception = null;
 14:         SendResult sendResult = null; // Send the result for the last time
 15:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // Synchronize multiple calls
 16:         int times = 0; // The number of times to send
 17:         String[] brokersSent = new String[timesTotal]; // Stores the broker name selected each time a message is sent
 18:         // Loop calls to send messages until success
 19:         for (; times < timesTotal; times++) {
 20:             String lastBrokerName = null == mq ? null : mq.getBrokerName();
 21:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // Select the queue to which the message is sent
 22:             if(tmpmq ! =null) {
 23:                 mq = tmpmq;
 24:                 brokersSent[times] = mq.getBrokerName();
 25:                 try {
 26:                     beginTimestampPrev = System.currentTimeMillis();
 27:                     // Call the send message core method
 28:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
 29:                     endTimestamp = System.currentTimeMillis();
 30:                     // Update Broker availability information
 31:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
 32:                     / /... 7. To process the sending of returned results.
 33:}34:}catch (e) { / /... 3. To handle an exception.
 35:                     
 36:}37:}else {
 38:                 break;
 39:}40:}41:         / /... 7. To process the sending of returned results.
 42:}43:     / /... Omitted: Processing [message route not found]
 44:}Copy the code

Debug #sendDefaultImpl(…) As a result of TopicPublishInfo, Producer obtains the message queues of broker-A and Broker-B:

Producer. TopicPublishInfo. Debugging. PNG

3.3 Consumer messages

  • ConsumerWhen consuming messages, will be onBrokerSelect all queues in the cluster.

4. To summarize

HA. Jpeg