Overview of RocketMQ architecture
1.1 Logical Deployment Diagram
(Photo from Internet)
1.2 Core Components
As can be seen from the figure above, RocketMQ consists of four core components: NameServer, Broker, Producer, and Consumer. The following four core components are briefly described:
NameServer: NameServer acts as the routing information provider. A producer or consumer can use NameServer to find a list of Broker IP addresses corresponding to each Topic. Multiple Namesrver instances form a cluster, but are independent of each other and do not exchange information.
Broker: A role that stores and forwards messages. The Broker server in the RocketMQ system is responsible for receiving and storing messages sent from producers and preparing consumers for pull requests. The Broker server also stores message-related metadata, including consumer groups, consumption progress offsets, and topic and queue messages.
Producer: Is responsible for producing messages. Generally, the business system is responsible for producing messages. A message producer sends messages generated in the business application to the Broker server. RocketMQ provides multiple delivery modes: synchronous, asynchronous, sequential, and unidirectional. Both synchronous and asynchronous require the Broker to return an acknowledgement message, but one-way does not.
Consumer: Is responsible for the consumption message, usually the backend system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user application, it provides two forms of consumption: pull consumption and push consumption.
In addition to the three core components mentioned above, the concept of Topic will be mentioned several times below:
Topic: Represents a collection of a class of messages. Each Topic contains several messages, and each message can belong to only one Topic. It is RocketMQ’s basic unit for message subscription. A Topic can be sharded across multiple Broker clusters, and each Topic fragment contains multiple queues, as shown in the following figure:
1.3 Design Concept
RocketMQ is a topic-based publish and subscribe model. Its core functions include message sending, message storage, and message consumption. The overall design is simple and performance first.
-
NameServer replaces ZK as the registry. NameServer clusters do not communicate with each other and tolerate minute-based routing information inconsistencies within the cluster. NameServer is lightweight.
-
Memory mapping mechanism is used to achieve efficient IO storage and high throughput.
-
Tolerates design flaws by ensuring that the message is consumed at least once through an ACK, but if an ACK is lost, the message may be consumed repeatedly, which is allowed by design and left to the consumer.
This article focuses on NameServer. Let’s take a look at how NameServer is started and how routing is managed.
Second, NameServer architecture design
In Chapter 1, WE briefly introduced NameServer’s replacement of ZK as a more lightweight registry for routing information providers. So how to implement routing information management? Let’s start with the following image:
The figure above describes the core principles of NameServer for route registration, route culling, and route discovery.
Route registration: The Broker server sends heartbeat signals to all nameservers in the NameServer cluster when it is started, and sends heartbeat signals to the NameServer every 30 seconds to tell the NameServer that it is alive. When a NameServer receives a heartbeat packet from the Broker, it records the Broker’s information and saves the time when the last heartbeat packet was received.
Routing culling: NameServer maintains a long connection to each Broker, receives heartbeat packets from the Broker every 30 seconds, and scans BrokerLiveTable every 10 seconds to see if the last heartbeat received is greater than 120 seconds compared to the current time. If so, the Broker is considered unavailable. Remove information about the Broker from the routing table.
Route discovery: Route discovery is not real-time. After the route changes, NameServer does not actively push the route to the client and waits for the producer to pull the latest route information periodically. This approach reduces the complexity of NameServer implementation and ensures high availability of message sending by using a fault-tolerant mechanism on the sending side when the route changes. (This will be covered in a follow-up article on sending Producer messages.)
High availability: NameServer deploys multiple NameServer servers to ensure its high availability and does not communicate with each other. As a result, data among NameServer servers may not be identical when routing information changes. However, the fault tolerance mechanism of the sending end ensures the high availability of message sending. This is where NameServer aims for simplicity and efficiency.
3. Startup process
How does NameServer get started?
Since interpretation is the source code, so let’s look at the code entry: org. Apache. Rocketmq. Namesrv. NamesrvStartup# main (String [] args), is the actual call main0 () method,
The code is as follows:
public static NamesrvController main0(String[] args) {
try {
/ / create namesrvController
NamesrvController controller = createNamesrvController(args);
// Initializes and starts NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
Copy the code
Using the main method to start NameServer, there are two main steps: first create NamesrvController, then initialize and start NamesrvController. Let’s break it down.
3.1 a sequence diagram
Before we read the code in detail, let’s use a sequence diagram to get an idea of the overall process, as shown below:
3.2 create NamesrvController
Let’s start with the core code, as follows:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// Set the version number to the current version
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
/ / construct org.apache.com mons. Cli. The Options, and add - h - n parameters, -h parameter is to print help information, -n parameter is specified namesrvAddr
Options options = ServerUtil.buildCommandlineOptions(new Options());
// Initialize the commandLine and add the -c -p parameter to options. -c specifies the nameserver configuration file path and -p specifies the configuration information to be printed
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// Nameserver configuration class, business parameters
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// Netty server configuration class, network parameters
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Set the port number of nameserver
nettyServerConfig.setListenPort(9876);
If the -c parameter is specified in the // command, you need to read the content of the configuration file according to the path of the configuration file and assign the configuration information to NamesrvConfig and NettyServerConfig
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if(file ! =null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// Reflection mode
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// Set the configuration file path
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); }}// The command line with -p indicates the command to print parameters, so the properties of NamesrvConfig and NettyServerConfig are printed. /mqnameserver -c configFile -p Prints the currently loaded configuration properties
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
// Print parameters command does not need to start the Nameserver service, just print parameters
System.exit(0);
}
// Parse command line arguments and load them into namesrvConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// Check ROCKETMQ_HOME, cannot be empty
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// Initialize the logback log factory. Rocketmq uses logback as log output by default
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
/ / create NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
/ / copies the contents of the global Properties to NamesrvController Configuration. AllConfigs
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
Copy the code
From the comments above for each line of code, you can see that the process of creating a NamesrvController is divided into two main steps:
Step1: obtain the configuration on the cli. Assign values to the NamesrvConfig and NettyServerConfig classes.
Step2: Construct an instance of NamesrvController according to the configuration classes NamesrvConfig and NettyServerConfig.
NamesrvConfig and NettyServerConfig are the service parameters of NameServer and network parameters of NettyServerConfig.
NamesrvConfig
NettyServerConfig
Apache Commons CLI is an open source command line parsing tool that helps developers quickly build startup commands and organize command arguments, output lists, etc.
3.3 Initialization and Startup
After the NamesrvController instance is created, initialize and start the NameServer.
The code entry is NamesrvController#initialize.
public boolean initialize(a) {
// load the kvconfig. json configuration file under kvConfigPath and place these configurations in the KVConfigManager#configTable property
this.kvConfigManager.load();
// Initialize a Netty server according to nettyServerConfig.
/ / in NamesrvController brokerHousekeepingService instantiate the constructor when instantiated, this class is responsible for the Broker connect event processing, realized the ChannelEventListener, BrokerLiveTable, which is primarily used to manage RouteInfoManager
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// Initialize the thread pool responsible for processing Netty network interaction data. The default number of threads is 8
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
/ / registered Netty service end business processing logic, if open the clusterTest, then register the request processing class is ClusterTestRequestProcessor, otherwise the request processing class is DefaultRequestProcessor
this.registerProcessor();
// register a heartbeat thread pool with a 5-second delay for starting RouteInfoManager#brokerLiveTable property every 10 seconds to scan for non-living brokers
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
// Register to print the kvConfig thread pool with a 1-minute start delay and print the kvConfig every 10 minutes
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
// RocketMQ can improve the security of data transmission by enabling TLS. If enabled, a listener needs to be registered to reload SslContext
if(TlsSystemConfig.tlsMode ! = TlsMode.DISABLED) {// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false; reloadServerSslContext(); }}private void reloadServerSslContext(a) { ((NettyRemotingServer) remotingServer).loadSslContext(); }}); }catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically"); }}return true;
}
Copy the code
The above code is the NameServer initialization process. Through the comments of each line of code, it can be seen that there are five main steps:
-
Step1: load the KV configuration and write it to the configTable property of KVConfigManager;
-
Step2: Initialize the Netty server.
-
Step3: initialize the thread pool that processes netty network interaction data;
-
Step4: Register the heartbeat thread pool and check the survival of the Broker every 10 seconds after 5 seconds.
-
Step5: register the thread pool for printing KV configuration, and print KV configuration every 10 minutes after starting 1 minute.
A common programming trick used by the RocketMQ development team is to gracefully shut down NameServer using JVM hook functions. The shutdown operation is performed before the JVM process is shutdown.
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call(a) throws Exception {
controller.shutdown();
return null; }}));Copy the code
Start NameServer by executing the start function. The code is relatively simple, which is to start the Netty Server created in the first step. The remotingServer.start() method is not used in netty. The remotingServer.start() method is used in netty.
public void start(a) throws Exception {
// Start netty service
this.remotingServer.start();
// If TLS is enabled
if (this.fileWatchService ! =null) {
this.fileWatchService.start(); }}Copy the code
4. Route management
We learned at the beginning of Chapter 2 that NameServer is a lightweight registry that provides routing information for Topic producers and consumers, and manages routing information and Broker nodes, including route registration, route culling, and route discovery.
This chapter will analyze how NameServer manages routing information from the perspective of source code. The core code are mainly in org. Apache. Rocketmq. Namesrv. Routeinfo. RouteInfoManager implementation.
4.1 Routing meta Information
Before we learn about routing information management, we need to know what routing meta information NameServer stores and what the data structure is.
Looking at the code, we can see that routing meta-information is maintained mainly through five attributes, as follows:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Copy the code
Let’s expand on each of these five attributes in turn.
4.1.1 TopicQueueTable
Topic message queue routing information, messages are sent according to the routing table for load balancing.
Data structure: A HashMap structure where the key is the Topic name and the value is a collection of queues of type QueueData. As we saw in chapter 1, there are multiple queues in a Topic. QueueData’s data structure is as follows:
Example data structure:
topicQueueTable:{
"topic1": [{"brokerName": "broker-a"."readQueueNums":4."writeQueueNums":4."perm":6."topicSynFlag":0}, {"brokerName": "broker-b"."readQueueNums":4."writeQueueNums":4."perm":6."topicSynFlag":0,}}]Copy the code
4.1.2 BrokerAddrTable
Description: Broker base information, including BrokerName, cluster name, and primary and standby Broker addresses.
Data structure: HashMap structure, key is BrokerName, value is an object of type BrokerData. BrokerData’s data structure is as follows (understood in conjunction with the Broker master-slave structure logic diagram below) :
Broker master/slave logical diagram:
Example data structure:
brokerAddrTable:{
"broker-a": {
"cluster": "c1"."brokerName": "broker-a"."brokerAddrs": {
0: "192.168.1.1:10000".1: "192.168.1.2 instead: 10000"}},"broker-b": {
"cluster": "c1"."brokerName": "broker-b"."brokerAddrs": {
0: "192.168.1.3:10000".1: "192.168.1.4:10000"}}}Copy the code
4.1.3 ClusterAddrTable
Broker cluster information that stores the names of all brokers in the cluster.
Data structure: HashMap structure, key is ClusterName, value is Set structure that stores BrokerName.
Example data structure:
clusterAddrTable:{
"c1": ["broker-a"."broker-b"]}Copy the code
4.1.4 BrokerLiveTable
Description: Broker status information. NameServer replaces this information every time it receives a heartbeat packet
Data structure: A HashMap structure where the key is the address of the Broker and the value is the Broker information object of the BrokerLiveInfo structure. BrokerLiveInfo’s data structure is as follows:
Example data structure:
brokerLiveTable:{
"192.168.1.1:10000": {
"lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":""
},
"192.168.1.2 instead: 10000": {
"lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":"192.168.1.1:10000"
},
"192.168.1.3:10000": {
"lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":""
},
"192.168.1.4:10000": {
"lastUpdateTimestamp": 1518270318980."dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":"192.168.1.3:10000"}}Copy the code
4.1.5 filterServerTable
FilterServer is a list of filterServers on the Broker. The Consumer is registered with the Broker by pulling data from the FilterServer.
Data structure: The HashMap structure, where the key is the Broker address and the value is the List of filterServer addresses.
4.2 Route Registration
Route registration is implemented through the heartbeat function between Broker and NameServer. There are two main steps:
Step1:
The Broker starts and sends heartbeat statements to all nameservers in the cluster every 30 seconds (default 30 seconds, interval between 10 and 60 seconds).
Step2:
NameServer receives a heartbeat package update topicQueueTable brokerAddrTable, brokerLiveTable, clusterAddrTable, filterServerTable.
Let’s analyze these two steps separately.
4.2.1 Broker sends heartbeat packets
Send a heartbeat packet core logic is in the Broker startup logic code entry is org. The apache. Rocketmq. Broker. BrokerController# start, this article focus on the logic of the heartbeat packets are sent, only list the core code, send a heartbeat packet is as follows:
1) create a thread pool registered Broker, execution start after 10 seconds, every 30 seconds (default 30 s, the time interval between 10 seconds to 60 seconds, BrokerConfig. GetRegisterNameServerPeriod () is the default value is 30 seconds) to perform again.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e); }}},1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
Copy the code
2) After encapsulating the Topic configuration and version number, the actual route registration takes place. The actual routing is registered in the org. Apache. Rocketmq. Broker. Out. BrokerOuterAPI# registerBrokerAll in implementation, the core code is as follows:
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
// Get nameserver address list
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
/** * start * Encapsulates the request header, which encapsulates information about the broker **/
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
// Encapsulate the requestBody, including topic and filterServerList information
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
/** * encapsulates the request header end **/
// Enable multithreading to register with each nameserver
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run(a) {
try {
// Actually register the method
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if(result ! =null) {
Encapsulate the information returned by nameserver
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally{ countDownLatch.countDown(); }}}); }try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
Copy the code
From the above code, also relatively simple, first need to wrap the request header and requestBody, and then enable multi-threading to each NameServer server to register.
Request header types for RegisterBrokerRequestHeader, mainly including the following fields:
RequestBody type is RegisterBrokerBody and contains the following fields:
1) The actual route registration is implemented through registerBroker method, the core code is as follows:
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
// To create a request, note that the network processor on the nameserver side of the RequestCode.REGISTER_BROKER will process the corresponding business according to the RequestCode
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
// Network transmission based on Netty
if (oneway) {
// If the call is one-way, no value is returned, nameserver does not return the result
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// The asynchronous call initiates registration with nameserver to get the return information from nameserver
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assertresponse ! =null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
// Get the returned reponseHeader
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
// Rewrap the returned result, updating masterAddr and haServerAddr
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if(response.getBody() ! =null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
Copy the code
Borker and NameServer are transmitted over netty. When a Broker initiates a registration request to NameServer, it adds the registration code requestcode.register_broker to the request. This is a network tracing method. RocketMQ defines a requestCode for each request, and the network processor on the server will process the affected business according to the different requestCode.
4.2.2 NameServer Processing Heartbeat Packets
After the Broker sends a registered heartbeat packet, NameServer processes it according to the requestCode in the heartbeat packet. The default network processor for NameServer is DefaultRequestProcessor.
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if(ctx ! =null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
......
// If it is requestcode. REGISTER_BROKER, register the broker
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request); }...default:
break;
}
return null;
}
Copy the code
Determine the requestCode and, if it is requestcode.register_broker, determine that the business processing logic is the registered Broker. According to the different methods of Broker version number choice, we have more than V3_0_11, for example, call registerBrokerWithFilterServer method to register the main steps are divided into three steps:
Step1:
Analyze the requestHeader and check it (based on CRC32) to determine whether the data is correct;
Step2:
A. Topic B.
Step3:
Call RouteInfoManager#registerBroker to register the Broker;
The core registration logic is implemented by RouteInfoManager#registerBroker. The core code is as follows:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// Add a write lock to prevent concurrent writing of routing table information in the RoutInfoManager.
this.lock.writeLock().lockInterruptibly();
// Get all broker names from clusterAddrTable based on clusterName
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
// brokerName needs to be created and added to the broker collection for the cluster if it is not recorded
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
// brokerData is attempted from brokerAddrTable according to brokerName
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
// If brokerData is not fetched, create a new brokerData and place brokerAddrTable with registerFirst set to true;
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// Update brokerAddrs in brokerData
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
// brokerId will be 0 in case the master hangs and the slave becomes master, and the old brokerAddr needs to be removed
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); }}// brokerAddrs are updated to determine if the broker is registered for the first time based on the oldAddr returned
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// If the Broker is Master and the Topic configuration of the Broker changes or is registered for the first time, the Topic routing metadata needs to be created or updated to populate the topicQueueTable
if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if(tcTable ! =null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// Create or update Topic routing metadata
this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}/ / update BrokerLivelnfo BrokeLivelnfo is perform routing delete important basis
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// Register the Broker's filterServer address list
if(filterServerList ! =null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList); }}// If the Broker is a slave node, look up the node information of the Broker Master and update the corresponding masterAddr attribute
if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
this.lock.writeLock().unlock(); }}catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
Copy the code
From the above source code analysis, we can break down the registration of a Broker into seven main steps:
-
Step1: Add write lock to prevent concurrent writing of routing table information in RoutInfoManager;
-
Step2: check whether the cluster of brokers exists. If not, add the Broker name to the cluster.
-
Step3: Maintain BrokerData;
-
Step4: if the Broker is Master and the Topic configuration information of the Broker changes or is registered for the first time, create or update the Topic routing metadata and populate the TopicQueueTable.
-
Step5: Update BrokerLivelnfo;
-
Step6: register the Broker’s filterServer address list.
-
Step7: if the Broker is a slave node, look for the node information of the Broker Master, update the corresponding masterAddr property, and return it to the Broker end.
4.3 Route Deletion
4.3.1 Trigger conditions
There are two trigger conditions for route deletion:
NameServer scans BrokerLiveTable every 10s and if no heartbeat packet is received for 120s, removes the Broker and closes the socket connection.
Route deletion is triggered when the Broker shuts down.
4.3.2 Source code analysis
The logic of the trigger points described above is the same as that of RouteInfoManager#onChannelDestroy
The core code is as follows:
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if(channel ! =null) {
try {
try {
/ / read lock
this.lock.readLock().lockInterruptibly();
// Find the corresponding Broker address from brokerLiveTable via a channel
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break; }}}finally {
// Release the read lock
this.lock.readLock().unlock(); }}catch (Exception e) {
log.error("onChannelDestroy Exception", e); }}If the Broker has been removed from the list of living Broker addresses, use remoteAddr directly
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if(brokerAddrFound ! =null && brokerAddrFound.length() > 0) {
try {
try {
// Apply write lock
this.lock.writeLock().lockInterruptibly();
// According to brokerAddress, remove this brokerAddress from brokerLiveTable and filterServerTable
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
/ / traverse brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
// Find the corresponding brokerData according to brokerAddress and remove the corresponding brokerAddress from brokerData
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break; }}// If the brokerAddress for the entire brokerData is empty after removal, the entire brokerData is removed
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); }}if(brokerNameFound ! =null && removeBrokerName) {
/ / traverse clusterAddrTable
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
// Remove the brokerName that needs to be removed according to the brokerName retrieved in Step 3
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
// If the set is empty, remove the entire cluster from the clusterAddrTable
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break; }}}if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
/ / traverse topicQueueTable
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
// Remove the corresponding broker under Topic according to brokerName
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); }}// If there is only one broker under the topic to be removed, then the topic is removed from the table
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); }}}}finally {
// Release the write lock
this.lock.writeLock().unlock(); }}catch (Exception e) {
log.error("onChannelDestroy Exception", e); }}}Copy the code
The overall route deletion logic is divided into six steps:
-
Step1: add readlock, find the corresponding Broker address from BrokerLiveTable via channel, release the readlock, and use remoteAddr directly if the Broker has been cleared from the list of living Broker addresses.
-
Step2: apply write lock, remove BrokerLiveTable, filterServerTable according to BrokerAddress.
-
Step3: Go through BrokerAddrTable, find the corresponding brokerData according to BrokerAddress, remove the corresponding BrokerAddress in brokerData, and if the entire brokerData BrokerAddress is empty after BrokerAddress is removed, Then remove the entire brokerData.
-
Step4: walk through the clusterAddrTable and remove the brokernames to be removed based on the brokernames retrieved in step 3. If the set is empty after removal, the entire cluster is removed from the clusterAddrTable.
-
Step5: Traverse TopicQueueTable and remove the corresponding Broker under the Topic according to BrokerName. If there is only one Broker under the Topic to be removed, then the Topic is removed from the table.
-
Step6: release the write lock.
As can be seen from the above, the overall logic of routing elimination is relatively simple, that is, it simply operates against the data structure of routing meta-information. To get a better understanding of this code, it is recommended that you read the code by referring to the data structure of the routing meta-information described in 4.1.
4.4 Route Discovery
When routing information changes, NameServer does not actively push routing information to the client. Instead, NameServer waits for the client to periodically pull the latest routing information from NameServer. This design approach reduces the complexity of NameServer implementation.
4.4.1 Producer actively pulls
After being started, producer starts a series of scheduled tasks, including getting Topic routing information from NameServer on a regular basis. The code entry is MQClientInstance# start-scheduledTask (), and the core code is as follows:
private void startScheduledTask(a) {...this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
// Update topic routing information from nameserver
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); }}},10.this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); . }/** * Get topic routing information from nameserver */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {...// Send a request to nameserver with requestCode as requestcode.get_routeInfo_by_topicRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader); . }Copy the code
The producer and NameServer communicate over netTY, and the producer adds registration codes to requests initiated by NameServer
RequestCode. GET_ROUTEINFO_BY_TOPIC.
4.4.2 NameServer Returns routing information
After receiving a request from the Producer, the NameServer processes the request based on the requestCode contained in the request. RequestCode processing is also done in the default network processor DefaultRequestProcessor, and is eventually done through RouteInfoManager#pickupTopicRouteData.
TopicRouteData structure
Before formally parsing the source code, let’s take a look at the data structure that NameServer returns to the producer. TopicRouteData = TopicRouteData = TopicRouteData
QueueData, BrokerData, and filterServerTable are described in section 4.1 when the routing meta information is introduced.
Source code analysis
Now that we know the TopicRouteData structure returned to the producer, we go to the RouteInfoManager#pickupTopicRouteData method to see how to implement it.
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
/ / read lock
this.lock.readLock().lockInterruptibly();
// Get the collection of queues based on the topic name from the topicQueueTable metadata
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if(queueDataList ! =null) {
// Write the fetched set of queues to topicRouteData's queueDatas
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
// Iterate over the brokerName extracted from the QueueData collection
for (String brokerName : brokerNameSet) {
// Fetch brokerData from brokerAddrTable according to brokerName
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null! = brokerData) {// Clone the brokerData object and write to topicRouteData's brokerDatas
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
/ / traverse brokerAddrs
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
// Fetch the filterServerList according to brokerAddr, wrap it and write it to topicRouteData's filterServerTable
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
// Release the read lock
this.lock.readLock().unlock(); }}catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
Copy the code
QueueDatas, BrokerDatas, and filterServerTable for TopicRouteData are wrapped. The orderTopicConf field is not wrapped. We look up at the call method DefaultRequestProcessor#getRouteInfoByTopic for RouteInfoManager#pickupTopicRouteData as follows:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...// This is the code parsed above to get the topicRouteData object
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if(topicRouteData ! =null) {
// Check whether the orderMessageEnable configuration of Nameserver is enabled
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
// If the configuration is enabled, get the sequential message configuration content in the kvConfig configuration file based on the namespace and topic names
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
/ / encapsulation orderTopicConf
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
// If no topic route is obtained, the reponseCode is TOPIC_NOT_EXIST
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
Copy the code
Combining these two methods, we can conclude that the search Topic routing is divided into three steps:
Call RouteInfoManager# pickupTopicRouteData from topicQueueTable brokerAddrTabl, filterServerTable in access to information, Fill queue-Datas, BrokerDatas, filterServerTable, respectively.
If topic is a sequential message, then the configuration about the sequential message precedents is taken from KVconfig and populated into orderTopicConf.
If no routing information is found, return code as responsecode.topic_not_exist.
Five, the summary
This article introduces RocketMQ NameServer from a source code perspective, including the startup process of NameServer, route registration, route culling, and route discovery. After we know the design principle of NameServer, we can also go back to think about some tips worth learning in the design process. Here I put forward two points:
-
The startup process registers JVM hooks for graceful downtime. This is a programming trick that can be used to register JVM hooks to release resources or do something before the JVM shuts down to ensure elegant downtime when using thread pools or resident thread tasks.
-
Update routing table when the need to lock to prevent concurrent operation, the use of locking granularity is less read-write lock, allows multiple message sender concurrent read and ensure the high concurrency when the message is sent, but the same time NameServer deals only with a Broker heartbeat packets, multiple heartbeat package request serial implementation, it is also a read-write lock the classic usage scenarios.
Vi. Reference materials
1. Inside RocketMQ Technology
RocketMQ Core Principles and Practices
Apache RocketMQ Developer’s Guide
Author: Ye Wenhao, Vivo Internet Server Team