The fourth article in 2021
Description of content
This article focuses on the Namespace of the routing center in RocketMQ, and then tries to understand the design principle from a source code perspective.
Namespace Routing Center
Namespace is the routing center for RocketMQ. Namespace exists to store all Broker and Topic information so that producers and consumers can register and update routing information via heartbeat.
In fact, Namespace functions much like a registry in a microservices architecture, distributed services SOA architecture. For example, older versions of Kafka register cluster information with Zookeeper. But instead of using third-party tools, RocketMQ developed Namespace as a routing service. Namespaces can also be clustered, but they are stateless nodes, and there is no Master/Slave concept.
When talking about routing centers, we need to understand several roles:
- Namespace Routing Center
- Broker message server
- Producer producers
- Consumer consumers
The following picture shows the relationship between them
Arrow directions represent the direction in which each character depends on the action.
- First, the Broker registers with a Namespace
- Producer gets the address of the Broker from Namespace and accesses the Broker according to the rules
- The Consumer retrieves the Broker’s address from the Namespace and accesses the Broker according to the rules
As the manager of the Broker,Namespace checks for availability at regular intervals. Moreover, message producers and consumers are not directly notified when routing changes. This is to reduce the complexity of the overall architecture
As a manager, the Namespace can also be deployed in multiple ways to maintain high availability. In addition,Namespace does not communicate with Namespace, that is, the information between the two is different at a certain time, but it does not affect the message sending. Architecture brief and available. This is the highlight of RocketMQ’s design, and one we can learn from!
Below, I will use the source code, a preliminary explanation of the following content:
- The design of the Namespace
- Broker management, including join and drop, heartbeat
- How is a Namespace guaranteed to be highly available
The Namespace to start
In the RocketMQ source directory there is a namesrv, which is the Namespace source package. And the corresponding code entry, is NamesrvStartup NamesrvStartup is to start the class. The main function is to parse the parameters from the command line, configure namesrv and Netty configuration file parsing, configure the log context, configure NamesrvController and finally start.
NamesrvStartup the first focused method createNamesrvController. This method is used for parameter configuration, such as obtaining the parameters of the startup command line, reading the default configuration from the configuration file, etc.
The first step is to create a CommandLine and convert it into a CommandLine object in RocketMQ
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
Copy the code
Then fill in the corresponding NamesrvConfig object and NettyServerConfig object based on the parameters.
// get the parameter c
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if(file ! =null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// get the p argument
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// Turn properties into objects
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
Copy the code
The RocketMq directory is then retrieved to prepare the location where the log is generated
// instantiate the log context
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
// Internal loggerFactory
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// Prints parameters
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// Instantiate and configure NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// Remember that all configurations should not be ignored
controller.getConfiguration().registerConfig(properties);
Copy the code
After configuration, NamesrvController is started. First, initialization. The action of initialize is a bit more complicated. Mainly for the
- KvConfigManager load
- RemotingServer loads and starts
- Register the handler (handler refers to the logic that processes the request to access the current computer).
- Start two timers: scan inactive brokers and print kvConfigManager parameters
- Start FileWatchService for SSL/TLS
The KvConfigManager#load method starts loading the corresponding parameters
/ / 1.
content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
if(content ! =null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
/ / 2.
if (null! = kvConfigSerializeWrapper) {this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK"); }}Copy the code
①, get kv path from namesrvconfig; Code ②, read path to obtain parameters and add them to kv storage;
Next, RemotingServer loads and starts. RemotingServer is responsible for receiving registered network traffic from the Broker. It is in Remoting, RocketMQ’s generic abstract component package.
// Instantiate the remote server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// instantiate the thread pool
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
Copy the code
Then start both timers
// Timed sweep is not active Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
// Scan all active brokers
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
// Print KV
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
Copy the code
The principle of scanning inactive/disconnected brokers is simple. The current time is subtracted from the last time checked. If the time exceeds the threshold, the corresponding Channel of the Broker is retrieved.
The last thing to do is deal with SSL/TLS. To clarify,RocketMQ needs to define paths :CertPath, KeyPath, and TrustCertPath addresses for writing files. An error will be reported if there is no permission.
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
// Certificate change, key change
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
// If path is the same as ttlServer path
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
// Start if all changes are made. Reload context
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false; reloadServerSslContext(); }}// Load the loadSslContext
private void reloadServerSslContext(a) { ((NettyRemotingServer) remotingServer).loadSslContext(); }});Copy the code
The action for NamesrvController#initialize is now complete and NamesrvController#start is started.
// Get netty's remoteServer
this.remotingServer.start();
// The file monitoring service is enabled
if (this.fileWatchService ! =null) {
this.fileWatchService.start();
}
Copy the code
Since RemoteServer is already configured, you can start it directly. The bootstrap setup is similar to the Netty setup, with a focus on ServerBootstrap configuration
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
/ / 1.
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
/ / 2.
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
/ / 3.
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0.0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); }});Copy the code
Code ①, configure BACKLOGg, configure REUSEADDR, configure KEEPALIVE. Code ②, configure NoDelay, configure SNDBUF, configure RCVBUF, these parameters are TCP to send and receive optimization. If NoDelay is enabled,TCP will use Nagle algorithm to send data, which can make full use of the buffer, improve the quality of the request, but will slightly slow down the timeliness of transmission; We can see that the handler that added the processing logic is handshakeHandler, NettyDecoder, connectionManageHandler, serverHandler Heart processing logic);
To improve performance, the system determines whether to enable the Netty memory pool feature.
/ / isServerPooledByteBufAllocatorEnable set parameters
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
Copy the code
Finally, a timer timed scan timeout request is initiated. The principle of scanning is based on the difference between the request start time and the current time, if the maximum threshold is exceeded, the thread pool is asynchronously destroyed.
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run(a) {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e); }}},1000 * 3.1000);
Copy the code
Once RemoteServer is done, go to fileWatchService.
if (this.fileWatchService ! =null) {
this.fileWatchService.start();
}
Copy the code
FileWatchService listens for specified files
this.watchFiles = new ArrayList<>();
this.fileCurrentHash = new ArrayList<>();
// Listen to the file
for (int i = 0; i < watchFiles.length; i++) {
if (StringUtils.isNotEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
// Add file and file Hash result code
this.watchFiles.add(watchFiles[i]);
this.fileCurrentHash.add(hash(watchFiles[i])); }}Copy the code
The Hash code is used to analyze whether the file has changed. It is then processed in FileWatchService#run
while (!this.isStopped()) {
try {
// Set the scheduled listening duration
this.waitForRunning(WATCH_INTERVAL);
// Loop over the list of files
for (int i = 0; i < watchFiles.size(); i++) {
String newHash;
try {
newHash = hash(watchFiles.get(i));
} catch (Exception ignored) {
log.warn(this.getServiceName() + " service has exception when calculate the file hash. ", ignored);
continue;
}
// If the Hash is changed, the file is changed
if(! newHash.equals(fileCurrentHash.get(i))) { fileCurrentHash.set(i, newHash); listener.onChanged(watchFiles.get(i)); }}}}Copy the code
If the file changes, the onChanged function is called
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
// Start if all changes are made. Reload context
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
Copy the code
At this point,Namspace has been successfully started.
The management of the Broker
Broker management includes Broker information, registration, and culling.
The Broker information
RouteInfoManager is the core class that manages the Broker for Namespace. It is in RouteInfoManager under rounteinfo package of Namesrv. Here are the member attributes of the class.
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code
The following is an explanation of the member parameters:
topicQueueTable
, message queue routes messages, and message sending is balanced according to the routing tablebrokerAddrTable
Broker base information. The cluster name, brokerName, and primary and secondary Broker addressesclusterAddrTable
Broker cluster information, which stores the names of all brokers in each clusterbrokerLiveTable
Broker status information, heartbeat packets are updatedfilterServerTable
, a list of FilterServers for class pattern message filtering
There are several data structures involved in the member property: QueueData, BrokerData, and BrokerLiveInfo. They have the following description:
- A Topic has multiple message queues. By default, a Broker creates four read queues and four write columns for each Topic.
- Multiple brokers in a cluster with the same BrokerName form a master-slave schema.
- LastUpdateTimestamp in BrokerLiveInfo holds the time of the last heartbeat;
Suppose we have a cluster with 2 master and 2 slave RocketMQ schemas. At run time the corresponding parameters are:
The first is the master-slave relationship
{
cluster: c1,
brokerName: broker-a,
brokerId: 0
}
{
cluster: c1,
brokerName: broker-a,
brokerId: 1
}
Copy the code
The second master-slave relationship
{
cluster: c1,
brokerName: broker-b,
brokerId: 0
}
{
cluster: c1,
brokerName: broker-b,
brokerId: 1
}
Copy the code
Then when run, the corresponding topicQueueTable is:
topicQueueTable: {
topic1: [
{
brokerName: "broker-a",
readQueueNums: 4,
writeQueueNums: 4,
perm: 6,
topicSynFlag: false
},
{
brokerName: "broker-b",
readQueueNums: 4,
writeQueueNums: 4,
perm: 6,
topicSynFlag: false
}
],
topic2: ...
}
Copy the code
Then the corresponding brokerAddrTable is:
brokerAddrTable: {
"broker-a": {
cluster: "c1",
brokerName: "broker-a",
brokerAddrs: {
0:"192.168.1.1",
1:"192.168.1.2 instead"}},"broker-b": {
cluster: "c1",
brokerName: "broker-b",
brokerAddrs: {
0:"192.168.1.3",
1:"192.168.1.4"}}}Copy the code
Then the corresponding brokerLiveTable is:
brokerLiveTable: {
"192.168.1.1": {
lastUpdateTimestamp: 1623587820994,
dataVersion: 0xxie,
channel: channelObj,
haServerAddr: "192.168.1.2 instead"
},
"192.168.1.2 instead": {
lastUpdateTimestamp: 1623587123994,
dataVersion: 0xxi1,
channel: channelObj,
haServerAddr: "192.168.1.2 instead"
},
"192.168.1.3": {
lastUpdateTimestamp: 1624558123994,
dataVersion: 0xxes,
channel: channelObj,
haServerAddr: v
},
"192.168.1.4": {
lastUpdateTimestamp: 1624558232343,
dataVersion: 0xevs,
channel: channelObj,
haServerAddr: "192.168.1.2 instead"}}Copy the code
The Broker is registered
Broker registers time nodes with Nameserver periodically both at start and after start. Broker registration is a two-step process:
- The Broker sends a registration request
- Namesrv processes registration requests
The Broker startup code entry point is located at org. Apache. Rocketmq. Broker. BrokerStartup. Start. In order to avoid some of the tedious configuration code, I write a call chain here.
BrokerStartup#start
BrokerController#start
BrokerController#registerBrokerAll
BrokerController#doRegisterBrokerAll
Copy the code
The doRegisterBrokerAll method basically sends a request to register. First get the NamesRV address list and construct the request header
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
// Construct the request
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
// Construct the Broker's request parameters
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
Copy the code
It then loops through the namesrv address list and sends a request for registration
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run(a) {
try {
// Register each NameServer address circularly
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if(result ! =null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
}
/ /...}}); }Copy the code
Above, the broker sends the registration request. Namesrv needs to handle broker registration requests.
DefaultRequestProcessor is a request processor that determines which events to process based on the code of the external request. When the requestCode = 103, will be called DefaultRequestProcessor# registerBrokerWithFilterServer or DefaultRequestProcessor# registerBroker method. The former excludes unregistered brokers based on a filter list, while the latter can be registered directly.
The registerBroker method verifies that the request is valid
if(! checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match");
return response;
}
Copy the code
Then get the RouteInfoManager of Namesrv and update the broker information into it
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
Copy the code
The last set HaServerAddress/MasterAddress, return the result of Reponse in construction and filling parameters
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
/ / a
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Copy the code
Let’s focus on the RouteInfoManager#registerBroker method. Because information needs to be written, it is locked first
this.lock.writeLock().lockInterruptibly();
Copy the code
Checks for the presence of broker information. If no, add one.
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Copy the code
Check whether the broker is registered for the first time by checking whether broker information exists or whether the address of the broker previously exists in broker information
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
Copy the code
If the broker is Master and the topic configuration of the broker changes or is registered for the first time, the topic routing metadata needs to be created or updated to populate the topicQueueTable, which automatically registers routing information for the default topic. This contains routing information for mixall.default_topic. When a message producer sends a topic, routing information for mixall.default_topic is returned if the topic is not created and BrokerConfig’s autoCreateTopicEnable is true.
if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if(tcTable ! =null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}Copy the code
Add broker survival information
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
Copy the code
Register a list of FilterServer addresses for brokers that have multiple FilterServer message filtering servers associated with one broker
if(filterServerList ! =null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList); }}Copy the code
If the broker is a slave node, you need to look up the node information for the Master of the broker and update the corresponding masterAddr property.
if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}Copy the code
Routing is destroyed
Route destruction can be done in two ways:
- Periodic scans weed out brokers without heartbeats
- Destroy if the broker exits normally
The core idea of both methods is to remove routing information from Namesrv, including topicQueuetable, brokerAddrTable, brokerLiveTable, and filterServerTable. So both methods have common code.
The entry to the timer scan is in the NamesrvController#initialize method
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
// Scan for brokers that are no longer active
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code
The scanNotActiveBroker method closes the channel of the broker and calls routeInfoManager#onChannelDestroy to remove the channel from route management.
try {
/ / 1.
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
/ / 2.
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break; }}if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}/ / 3.
if(brokerNameFound ! =null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break; }}}/ / 4.
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); }}}}finally {
/ / 5.
this.lock.writeLock().unlock();
}
Copy the code
Code 1, lock, remove code 2 from brokerLiveTable and filterServerTable, get the corresponding brokerData, remove address from brokerAddrs of brokerData the address of the broker; If brokerAddrs are empty the corresponding brokerData code needs to be removed ③, get the corresponding brokerNames from clusterAddrTable and remove them; If brokerNames are empty, the cluster code for brokerName (④) needs to be removed from clusterAddrTable and the data code (⑤) for brokerName needs to be removed from topicQueueTable, freeing the lock
The code entry for the normal exit of the broker is that DefaultRequestProcessor calls the DefaultRequestProcessor#unregisterBroker method when the DefaultRequestProcessor receives RequestCode=104.
public void unregisterBroker( final String clusterName, final String brokerAddr,
final String brokerName, final long brokerId) {
try {
try {
/ / 1.
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo ! =null ? "OK" : "Failed",
brokerAddr
);
/ / 2.
this.filterServerTable.remove(brokerAddr);
/ / 3.
boolean removeBrokerName = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null! = brokerData) { String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", addr ! =null ? "OK" : "Failed",
brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true; }}/ / 4.
if (removeBrokerName) {
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if(nameSet ! =null) {
boolean removed = nameSet.remove(brokerName);
log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}", clusterName ); }}this.removeTopicByBrokerName(brokerName); }}finally {
/ / 5.
this.lock.writeLock().unlock(); }}/ /...
}
Copy the code
Code ①, lock, remove broker live table data ②, remove filter service data ③, obtain the corresponding brokerData, and attempt to remove the address from brokerAddr. If brokerAddr is empty after removal, the entire broker is no longer in service code 4, remove the corresponding broker from the cluster. If the broker cluster is empty after removal, remove the cluster from code ⑤ to unlock it
Routing discovery
Namesrv does not actively notify clients when Topic routing changes. Therefore, the client needs to periodically pull and fetch new routing information. The client by calling a DefaultRequestProcessor# getTopicRouteInfoFromNameServer method to get the latest routing information.
/ / 1.
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if(topicRouteData ! =null) {
// whether to enable sequential consumption
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
/ / 3.
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
Copy the code
If the Topic routing information is not empty, the configuration parameter code (3) is obtained from the KvManager for “sequential consumption”. Finally, the information is backfilled into the request result
At the end of the article
In this article, the Namespace of RocketMQ will be discussed from the source code, which will provide some ideas for future wheel building.
end