2.2 NameServer
2.2.1 Architecture design
The design idea of message-oriented middleware is generally based on the topic subscription and publishing mechanism. Producer sends a topic to the message server, which is responsible for the persistent storage of messages, and consumers subscribe to the topic of interest. The message server decouples message producers from message consumers by pushing messages to consumers (Push pattern) or consumers actively pulling messages to the message server (Pull pattern) based on subscription information (routing information). In order to avoid a single point of failure of the message server, multiple message servers are usually deployed to share the storage of messages. How does the message producer know which message server to send the message to? If one of the message servers is down, how can message producers sense it without restarting the service?
NameServer is designed to solve these problems.
The Broker message server registers with all NameserVers when it starts. Producer gets a list of Broker server addresses from NameServer before sending a message, and then selects a server from the list to send the message according to the load balancing algorithm.
NameServer keeps a long connection to each Broker, checks for the Broker to be alive at 30 seconds interval, and removes it from the routing registry if it detects that the Broker is down. However, route changes do not immediately notify message producers. The purpose of this design is to reduce the complexity of NameServer implementation and provide a fault tolerant mechanism on the message sender to ensure the availability of message sending.
The high availability of NameServer itself is achieved by deploying multiple NameServer servers, but they do not communicate with each other. That is, the data between NameServer servers is not exactly the same at a certain moment, but this does not have any impact on message sending, which is also a highlight of NameServer design. In short, RocketMQ is designed to be simple and efficient.
2.2.2 Startup process
Start the class: org. Apache. Rocketmq. Namesrv. NamesrvStartup
Step one
Parse the configuration file, populate the NameServerConfig, NettyServerConfig property values, and create the NamesrvController
Code: NamesrvController# createNamesrvController
/ / create NamesrvConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
/ / create NettyServerConfig
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Set the boot port number
nettyServerConfig.setListenPort(9876);
// Parse the startup -c argument
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if(file ! =null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// Parse the startup -p argument
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
/ / will launch parameters filling to namesrvConfig, nettyServerConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
/ / create NameServerController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
Copy the code
NamesrvConfig properties
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
private boolean orderMessageEnable = false;
Copy the code
RocketmqHome: RocketMQ home directory
KvConfig: NameServer stores the persistent path of KV configuration attributes
ConfigStorePath: nameServer Default configuration file path
OrderMessageEnable: Whether sequential messages are supported
NettyServerConfig properties
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
private boolean useEpollNativeSelector = false;
Copy the code
ListenPort: NameServer Listening port, which is initialized to 9876 by default
ServerWorkerThreads: Specifies the number of threads in the Netty service thread pool
ServerCallbackExecutorThreads: Netty public task thread pool threads number of Netty network design, can create a thread pool of different depending on the type of business, such as processing messages, message consumption, heartbeat detection, etc. If the business type does not register a thread pool, it is executed by a public thread pool.
ServerSelectorThreads: The number of IO threads pools. NameServer and Broker end parses the request and returns the corresponding number of threads. These threads are used to process network requests, parse the request packet, forward it to the business thread pools to complete the specific operation, and then return the result to the caller.
ServerOnewaySemaphoreValue: send oneway message request concurrent read (Broker end parameters);
ServerAsyncSemaphoreValue: asynchronous messages maximum concurrent degree;
ServerChannelMaxIdleTimeSeconds: network connection’s largest leisure time, the default of 120 s.
ServerSocketSndBufSize: Indicates the size of the network socket send buffer.
ServerSocketRcvBufSize: Indicates the cache size of the network receiver.
Whether serverPooledByteBufAllocatorEnable: ByteBuffer open cache;
UseEpollNativeSelector: Specifies whether to enable the Epoll IO model.
Step 2
Create and initialize an instance of NamesrvController based on the startup property. NameServerController instance is the NameServer core controller. Does it look like Kafka!! They all have the idea of a controller.
Code: NamesrvController# initialize
public boolean initialize(a) {
// Load the KV configuration
this.kvConfigManager.load();
// Create a NettyServer network handling object
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// Enable scheduled tasks: scan brokers every 10 seconds to remove inactive brokers
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
// Enable the scheduled task: Prints the KV configuration every 10 minutes
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
return true;
}
Copy the code
Step 3
Close the thread pool before shutting down the JVM process to free up resources in time
Code: NamesrvStartup# start
// Register the JVM hook function code
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call(a) throws Exception {
// Release resources
controller.shutdown();
return null; }}));Copy the code
2.2.3 Route Management
NameServer’s main role is to provide message producers and message consumers with routing information about topics. Therefore, NameServer needs to store basic routing information and manage Broker nodes, including route registration and route deletion.
2.2.3.1 Route meta information
Code: RouteInfoManager
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code
TopicQueueTable: Topic Indicates the routing information of the message queue. The load is balanced when messages are sent according to the routing table
BrokerAddrTable: Broker base information, including brokerName, cluster name, and primary and secondary Broker addresses
ClusterAddrTable: Broker cluster information that stores the names of all brokers in the cluster
BrokerLiveTable: Broker status message that NameServer replaces each time it receives a heartbeat packet
FilterServerTable: List of FilterServers on the Broker for class-pattern message filtering.
2.2.3.2 Route Registration
1) Send heartbeat packets
RocketMQ route registration is implemented through the heartbeat function between Broker and NameServer. The Broker sends heartbeat messages to all nameservers in the cluster every 30 seconds at startup. When NameServer receives a heartbeat packet it updates the BrokerLiveInfo lastUpdataTimeStamp information in the brokerLiveTable cache, then NameServer scans brokerLiveTable every 10 seconds, If no heartbeat packet is received for 120 seconds, NameServer removes the Broker’s routing information and closes the Socket connection.
Code: BrokerController# start
// Register Broker information
this.registerBrokerAll(true.false.true);
// Report Broker information to NameServer every 30 seconds
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
TimeUnit.MILLISECONDS);
Copy the code
Code: BrokerOuterAPI# registerBrokerAll
// Get nameServer address information
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
// Iterate over all nameserver lists
if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
// Encapsulate the request header
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
// Wrap the request body
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run(a) {
try {
// Register with NameServer respectively
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if(result ! =null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally{ countDownLatch.countDown(); }}}); }try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
Copy the code
Code: BrokerOutAPI# registerBroker
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
Copy the code
2) Processing heartbeat packets
. Org. Apache. Rocketmq namesrv. Processor. DefaultRequestProcessor network processing class parse request type, if the request type for REGISTER_BROKER, The request is forwarded to RouteInfoManager#regiesterBroker
Code: DefaultRequestProcessor# the processRequest
// Decide to register Broker information
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
// Register Broker information
return this.registerBroker(ctx, request);
}
Copy the code
Code: DefaultRequestProcessor# registerBroker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
Copy the code
RouteInfoManager#registerBroker maintains routing information
/ / lock
this.lock.writeLock().lockInterruptibly();
/ / maintenance clusterAddrTable
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
Copy the code
/ / maintenance brokerAddrTable
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// brokerData is created for the first time
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// Update the Broker without first registering
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
Copy the code
/ / maintenance topicQueueTable
if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) ||
registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
if(tcTable ! =null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}Copy the code
Code: RouteInfoManager# createAndUpdateQueueData
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
/ / create the QueueData
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
// Get the set of queues in topicQueueTable
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
// If topicQueueTable is empty, queueData is added directly to the queue collection
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
// Check if it is a new queue
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
// If brokerNames are the same, the representatives are not new queues
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); }}}// If it is a new queue, add the queue to queueDataList
if(addNewOne) { queueDataList.add(queueData); }}}Copy the code
/ / maintenance brokerLiveTable
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
Copy the code
/ / maintenance filterServerList
if(filterServerList ! =null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}Copy the code
2.2.3.3 Deleting routes
The Broker sends a heartbeat packet to NameServer every 30 seconds. The heartbeat packet contains a BrokerId, Broker address, Broker name, cluster name, and a list of FilterServers associated with the Broker. But if brokers go down and NameServer cannot receive heartbeat packets, how does NameServer weed out failed brokers? NameServer scans the brokerLiveTable status table every 10 seconds. If a BrokerLive lastUpdateTimestamp is more than 120s from the current time, the Broker is considered invalid, removed, and connected to. Update topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable at the same time.
RocketMQ has two triggers to delete routing information:
- NameServer periodically scans brokerLiveTable for the time difference between the last heartbeat packet and the current system and removes the broker if the time exceeds 120s.
- The unregisterBroker directive is executed when the Broker is normally closed
The route deletion method in both methods is the same, that is, the information related to the broker is deleted from the related routing table.
Code: NamesrvController# initialize
// Scan for active brokers every 10s
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
Copy the code
Code: RouteInfoManager# scanNotActiveBroker
public void scanNotActiveBroker(a) {
/ / get brokerLiveTable
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
/ / traverse brokerLiveTable
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// If the time after receiving the heartbeat packet is more than 120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// Close the connection
RemotingUtil.closeChannel(next.getValue().getChannel());
/ / remove the broker
it.remove();
// Maintain the routing table
this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code
Code: RouteInfoManager# onChannelDestroy
// Request write locks removed from brokerLiveTable and filterServerTable according to brokerAddress
this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
Copy the code
/ / maintenance brokerAddrTable
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
/ / traverse brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
// Traverses the broker address
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
// Remove brokerAddr according to broker address
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break; }}If the current topic contains only brokers to be removed, remove that topic
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}Copy the code
/ / maintenance clusterAddrTable
if(brokerNameFound ! =null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
/ / traverse clusterAddrTable
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
// Get the cluster name
String clusterName = entry.getKey();
// Get the brokerName collection in the cluster
Set<String> brokerNames = entry.getValue();
// Remove brokerNameFound from brokerNames
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
// If the cluster does not contain any brokers, remove the cluster
it.remove();
}
break; }}}Copy the code
// Maintain the topicQueueTable queue
if (removeBrokerName) {
/ / traverse topicQueueTable
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
// The topic name
String topic = entry.getKey();
// Set of queues
List<QueueData> queueDataList = entry.getValue();
// Walk through the topic queue
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
// Remove the active broker message from the queue
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}// If the topic queue is empty, remove the topic
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); }}}Copy the code
// Release the write lock
finally {
this.lock.writeLock().unlock();
}
Copy the code
2.2.3.4 Route Discovery
RocketMQ route discovery is non-real-time. When the Topic route changes, NameServer does not actively push it to the client. Instead, the client periodically pulls the latest route of the Topic.
Code: DefaultRequestProcessor# getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
// Call the RouteInfoManager method to fill TopicRouteData's List
, List
, filterServer
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
// If the routing information corresponding to the topic is found and the topic is a sequential message, fill the routing information with the configuration related to the sequential message from NameServer KVConfig
if(topicRouteData ! =null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
Copy the code