Namesrv trilogy
- RocketMQ Namesrv Startup process parsing
- RocketMQ Namesrv Metadata store
- RocketMQ Namesrv Interaction flow
The opening
-
This article analyzes rocketMQ namesRV based on rocketMQ-all-4.6.1. The core functions of NamesRV include startup process, metadata storage, and interaction process.
-
This article focuses on analyzing the metadata storage of NamesRV.
Namesrv positioning
- Namesrv is positioned as a registry to hold routing information for broker nodes and simple K/V configuration information.
- Namesrv supports cluster mode, but each NamesRV is independent of each other and does not communicate with each other. Its multi-point Dr Obtains information through polling of producer/consumer when accessing namesRV (the current node fails to access the next node).
- Namesrv acts as a registry that receives regular registrations of brokers and keeps them in memory. It is true that NamesRV does not persist. All data is kept in memory.
- Namesrv provides an external interface for producers and consumers to access the routing information of brokers, which is implemented through NetTY.
- The heartbeat mechanism means that NamesRV, as the server of the broker, regularly receives the heartbeat information of the broker and removes the broker if there is no heartbeat after timeout. The connection anomaly detection mechanism detects connection disconnection through the message mechanism of epoll.
Namesrv metadata store
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvConfig namesrvConfig;
private final NettyServerConfig nettyServerConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
private final KVConfigManager kvConfigManager;
private final RouteInfoManager routeInfoManager;
private RemotingServer remotingServer;
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }}Copy the code
- KVConfigManager KVConfigManager saves KV configuration information.
- RouteInfoManager RouteInfoManager Saves routing information.
KVConfigManager
public class KVConfigManager { private final ReadWriteLock lock = new ReentrantReadWriteLock(); // The outer Namespace represents the Topic dimension, Private final HashMap<String/* Namespace */, HashMap<String/* key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>(); public KVConfigManager(NamesrvController namesrvController) { this.namesrvController = namesrvController; } public voidload() {
String content = null;
try {
content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
} catch (IOException e) {
log.warn("Load KV config table exception", e);
}
if(content ! = null) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);if(null ! = kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK");
}
}
}
public void putKVConfig(final String namespace, final String key, final String value) {
try {
this.lock.writeLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null == kvTable) {
kvTable = new HashMap<String, String>();
this.configTable.put(namespace, kvTable);
log.info("putKVConfig create new Namespace {}", namespace);
}
final String prev = kvTable.put(key, value);
if(null ! = prev) { log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
} else {
log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putKVConfig InterruptedException", e);
}
this.persist();
}
public void persist() {
try {
this.lock.readLock().lockInterruptibly();
try {
KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
kvConfigSerializeWrapper.setConfigTable(this.configTable);
String content = kvConfigSerializeWrapper.toJson();
if(null ! = content) { MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath()); } } catch (IOException e) { log.error("persist kvconfig Exception, "
+ this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
} finally {
this.lock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("persist InterruptedException", e);
}
}
public void deleteKVConfig(final String namespace, final String key) {
try {
this.lock.writeLock().lockInterruptibly();
try {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if(null ! = kvTable) { String value = kvTable.remove(key); log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("deleteKVConfig InterruptedException", e); } this.persist(); }}Copy the code
- The configuration of KVConfigManager is saved by key/value in the Topic dimension.
- KVConfigManager provides a add, delete, modify, and check interface to ensure thread safety through read/write locks.
- The configTable of KVConfigManager is used to save configurations, and the ReadWriteLock is used to read and write isolation.
- PutKVConfig saves KV, deleteKVConfig deletes KV, and persist KV.
RouteInfoManager
- TopicQueueTable holds topic-sized queue information, and queue information is broker information.
- BrokerAddrTable holds broker instance information BrokerData with a brokerName dimension and the address of the broker with a brokerId dimension inside BrokerData.
- ClusterAddrTable holds cluster information for brokers with a clusterName dimension and a collection of BrokerNames under the cluster.
- BrokerLiveTable holds active broker information in the IP address dimension.
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
public class QueueData implements Comparable<QueueData> {
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;
}
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
private final Random random = new Random();
public BrokerData() {
}
public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs) {
this.cluster = cluster;
this.brokerName = brokerName;
this.brokerAddrs = brokerAddrs;
}
}
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
String haServerAddr) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
this.dataVersion = dataVersion;
this.channel = channel;
this.haServerAddr = haServerAddr;
}
}
Copy the code
- RouteInfoManager’s core variable data results are shown in the figure above.
- Core variables topicQueueTable, brokerAddrTable, clusterAddrTable, brokerLiveTable.
topicQueueTable
{
"RMQ_SYS_TRANS_HALF_TOPIC": [{
"brokerName": "broker-a"."perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0."writeQueueNums": 1}]."SELF_TEST_TOPIC": [{
"brokerName": "broker-a"."perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0."writeQueueNums": 1}]."TBW102": [{
"brokerName": "broker-a"."perm": 7,
"readQueueNums": 8,
"topicSynFlag": 0."writeQueueNums": 8}],"BenchmarkTest": [{
"brokerName": "broker-a"."perm": 6,
"readQueueNums": 1024,
"topicSynFlag": 0."writeQueueNums": 1024}]."DefaultCluster": [{
"brokerName": "broker-a"."perm": 7,
"readQueueNums": 16."topicSynFlag": 0."writeQueueNums": 16}],"DefaultCluster_REPLY_TOPIC": [{
"brokerName": "broker-a"."perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0."writeQueueNums": 1}]."OFFSET_MOVED_EVENT": [{
"brokerName": "broker-a"."perm": 6,
"readQueueNums": 1,
"topicSynFlag": 0."writeQueueNums": 1}}]Copy the code
- TopicQueueTable holds information about a topic, including its broker, read and write queues, and so on.
brokerAddrTable
{
"broker-a": {
"brokerAddrs": {0:"192.168.0.8:10911"
},
"brokerName": "broker-a"."cluster": "DefaultCluster"}}Copy the code
- BrokerAddrTable holds broker information, brokerName (e.g. Broker-a) as key, and value as information.
clusterAddrTable
{
"DefaultCluster": ["broker-a"]}Copy the code
- ClusterAddrTable Stores cluster information. The cluster name (for example, DefaultCluster) is the key and value is the information about all brokers in the cluster.
brokerLiveTable
{
"192.168.0.8:10911": {
"channel": {
"active": false."inputShutdown": false."open": false."outputShutdown": true."registered": false."writable": false
},
"dataVersion": {
"counter": 1,
"timestamp": 1588318700534}."haServerAddr": "192.168.0.8:10912"."lastUpdateTimestamp": 1588323761374}}Copy the code
- BrokerLiveTable Holds live broker information with key as broker address information (e.g. 192.168.0.8:10911) and value as live broker information.
RouteInfoManager core operation
registerBroker
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if(null ! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // The register operation is performed only when brokerId=0 is master under the same brokerNameif(null ! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if(tcTable ! = null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if(filterServerList ! = null) {if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else{ this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! = null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if(brokerLiveInfo ! = null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e);
}
return result;
}
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); }}}if(addNewOne) { queueDataList.add(queueData); }}}}Copy the code
- RegisterBroker’s process implements thread-safe isolation through the read/write lock ReadWriteLock.
- Check the existence and operation of clusterAddrTable, brokerAddrTable, BrokerData, and topicQueueTable.
- Operations on topicQueueTable will only be performed when brokerId=0 is the master role.
unregisterBroker
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
public void unregisterBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId) {
try {
try {
this.lock.writeLock().lockInterruptibly();
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
log.info("unregisterBroker, remove from brokerLiveTable {}, {}", brokerLiveInfo ! = null ?"OK" : "Failed",
brokerAddr
);
this.filterServerTable.remove(brokerAddr);
boolean removeBrokerName = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if(null ! = brokerData) { String addr = brokerData.getBrokerAddrs().remove(brokerId); log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}", addr ! = null ?"OK" : "Failed",
brokerAddr
);
if (brokerData.getBrokerAddrs().isEmpty()) {
this.brokerAddrTable.remove(brokerName);
log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
brokerName
);
removeBrokerName = true; }}if (removeBrokerName) {
Set<String> nameSet = this.clusterAddrTable.get(clusterName);
if(nameSet ! = null) { boolean removed = nameSet.remove(brokerName); log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
removed ? "OK" : "Failed",
brokerName);
if (nameSet.isEmpty()) {
this.clusterAddrTable.remove(clusterName);
log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
clusterName
);
}
}
this.removeTopicByBrokerName(brokerName);
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("unregisterBroker Exception", e);
}
}
private void removeTopicByBrokerName(final String brokerName) {
Iterator<Entry<String, List<QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
while (itMap.hasNext()) {
Entry<String, List<QueueData>> entry = itMap.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, qd); it.remove(); }}if (queueDataList.isEmpty()) {
log.info("removeTopicByBrokerName, remove the topic all queue {}", topic); itMap.remove(); }}}}Copy the code
- UnregisterBroker procedures implement thread-safe isolation through the read/write lock ReadWriteLock.
- Operate in reverse order of brokerLiveTable, filterServerTable, brokerAddrTable, And clusterAddrTable.
- Interlocking operations for broker removal: brokerName is removed if brokerId removal causes the broker for brokerName to be empty, and removal of brokerName causes all topic messages on brokerName to be removed.
RocketMQ Broker configuration information
2 m – 2 s configuration
- BrokerClusterName must be the same for RocketMQ cluster mode.
- BrokerName in RocketMQ cluster mode is the same as brokerName in RocketMQ cluster mode. BrokerId =0 is master, and slave is stacked 1/2/3.
- Knowing the configuration of the RocketMQ cluster mode helps you understand RouteInfoManager information.
##broker-a.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
##broker-a-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
##broker-b.propertiesbrokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH##broker-b-s.properties
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
Copy the code