Note: This series of source code analysis is based on RocketMq 4.8.0, gitee Repository link: gitee.com/funcy/rocke… .
Having analyzed NameServer earlier, we will begin with the Broker.
1. Start the entrance
Broker to start the class for org. Apache. Rocketmq. Broker. BrokerStartup, code is as follows:
public class BrokerStartup {...public static void main(String[] args) { start(createBrokerController(args)); }... }Copy the code
In the main() method, there is only one line of code, which contains two operations:
createBrokerController(...)
: createBrokerController
start(...)
: startingBroker
Now let’s analyze these two operations.
2. To createBrokerController
The method for creating BrokerController is BrokerStartup#createBrokerController with the following code:
/** * Create broker configuration parameters *@param args
* @return* /
public static BrokerController createBrokerController(String[] args) {...try {
// Parse command line arguments
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// Process the configuration
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// TLS is security related
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// Configure the port
nettyServerConfig.setListenPort(10911);
// Message store configuration
final MessageStoreConfig messageStoreConfig = newMessageStoreConfig(); .// Set the configuration from the command line into the brokerConfig object
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
// Check the environment variable: ROCKETMQ_HOME
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// Omit some configurations./ / create brokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
controller.getConfiguration().registerConfig(properties);
/ / initialization
boolean initResult = controller.initialize();
if(! initResult) { controller.shutdown(); System.exit(-3);
}
// Close the hook to handle some operations before closing it
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run(a) {
synchronized (this) {
if (!this.hasShutdown) {
...
// a logout message is sent to nameServercontroller.shutdown(); . }}}},"ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
Copy the code
The code for this method is a bit long, but it doesn’t have a lot of features.
- Processing configuration: Mainly processing
nettyServerConfig
withnettyClientConfig
Configuration, this is some configuration parsing operations, processing methods andNameServer
Very similar, but I won’t go into it here. - Create and initialize
controller
: Call methodcontroller.initialize()
We will analyze this later. - Register close hook: call
Runtime.getRuntime().addShutdownHook(...)
, you can do some things before the JVM process shuts down.
2.1 controller
instantiation
BrokerController is created and initialized in the BrokerStartup#createBrokerController method. Let’s first look at its constructor:
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// Four core configurations
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
// Manage the offset of the consumer consumption message
this.consumerOffsetManager = new ConsumerOffsetManager(this);
// Manage topic configuration
this.topicConfigManager = new TopicConfigManager(this);
// Handle the consumer pull message request
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
// Message delivery listener
this.messageArrivingListener
= new NotifyMessageArrivingListener(this.pullRequestHoldService); .// The component that sends messages out
this.brokerOuterAPI = newBrokerOuterAPI(nettyClientConfig); . }Copy the code
BrokerController’s constructor is long and basically a bunch of assignment operations, with key items listed in the code. These include:
- Core configuration assignment: mainly
brokerConfig
/nettyServerConfig
/nettyClientConfig
/messageStoreConfig
Four configuration ConsumerOffsetManager
Management:consumer
The offset of the location of the consumption message, which indicates that the consumer group consumes ittopic
The position of the message, after consumption, from this position after consumption, to avoid repeated consumption of the message, also avoid missing consumption of the message.topicConfigManager
:topic
Configuration manager is used to managetopic
Configured, for exampletopic
The name,topic
The queue numberpullMessageProcessor
: message handler, which handles consumer pull messagesmessageArrivingListener
: a listener for message delivery that listens when a message from a producer arrivesbrokerOuterAPI
: A component that sends messages out, such as toNameServer
Send a registration/deregistration message
The use of these components above, here first mix a face familiar, we later analysis.
2.2 the initializationcontroller
Let’s look again at BrokerController#initialize:
public boolean initialize(a) throws CloneNotSupportedException {
// Load the configuration in the configuration file
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
// Message storage management component that manages messages on disk
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig);
// If DLeger is enabled, create dleger-related components
if (messageStoreConfig.isEnableDLegerCommitLog()) {
...
}
// Broker statistics component
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,
brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(
new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e); }}// Load records on disk, such as commitLog write location, consumer topic/queue information
result = result && this.messageStore.load();
if (result) {
/ / nettyServer processing
this.remotingServer = new NettyRemotingServer(
this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(
fastConfig, this.clientHousekeepingService);
// create thread pool start... Multiple types of thread pools are created here.// The thread pool that handles the consumer pull operation
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_")); .// create thread pool end...
// Register the handler
this.registerProcessor();
// Start a scheduled task start... There are a lot of scheduled tasks that start.// Periodically persist the consumer's offset, that is, save the data to disk
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e); }}},1000 * 10.this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); .// Start the scheduled task end....// Start some operations for DLeger
if(! messageStoreConfig.isEnableDLegerCommitLog()) { ... }// Handle TLS configuration
if(TlsSystemConfig.tlsMode ! = TlsMode.DISABLED) { ... }// Initialize some operations
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
Copy the code
This is still quite long, and the key parts are commented out. This method does the following:
- Load the configuration in the configuration file
- Assignment and initialization operations
- Creating a thread pool
- Register handler
- Starting a Scheduled Task
Here we look at the operation this.registerProcessor():
1. Registration processor:BrokerController#registerProcessor
BrokerController#registerProcessor the actual method called by this.registerprocessor () is BrokerController#registerProcessor:
public void registerProcessor(a) {
/** * SendMessageProcessor */
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,
this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
this.sendMessageExecutor); ./** * PullMessageProcessor */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/** * ReplyMessageProcessor */
ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); replyMessageProcessor.registerSendMessageHook(sendMessageHookList); . }Copy the code
There are many processors registered in this method. Only message-related content is listed here, such as sending messages, replying messages, and pulling messages. These processors will be used when processing producer/consumer messages.
2. remotingServer
Registration processor:NettyRemotingServer#registerProcessor
RemotingServer = remotingServer #registerProcessor
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {...@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor,
ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor,
ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair); }... }Copy the code
Finally, these handlers are registered in processorTable, which is a member variable of NettyRemotingAbstract and is defined as follows:
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>
Copy the code
This is a hashMap structure with a key of code and a value of Pair. There are two member variables in this class: The mapping between NettyRequestProcessor, ExecutorService, code, and NettyRequestProcessor is stored in the hashMap.
2.3 Register the closing hook:Runtime.getRuntime().addShutdownHook(...)
Now let’s look at the action of registering to close the hook:
// Close the hook to handle some operations before closing it
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run(a) {
synchronized (this) {
if (!this.hasShutdown) {
...
// a logout message is sent to nameServercontroller.shutdown(); . }}}},"ShutdownHook"));
Copy the code
Follow BrokerController#shutdown method:
public void shutdown(a) {
// Invoke the shutdown method for each component.// Send a logout message to NameServer
this.unregisterBrokerAll(); .// Persist the consumption offset of consumer
this.consumerOffsetManager.persist();
// Call the shutdown method for each component again.Copy the code
BrokerController#unregisterBrokerAll calls the shutdown() method for each component, sends a logout message to NameServer, and persists the consumer offset. BrokerController#unregisterBrokerAll:
private void unregisterBrokerAll(a) {
// Send a logout message to nameServer
this.brokerOuterAPI.unregisterBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId());
}
Copy the code
Continue into BrokerOuterAPI#unregisterBrokerAll:
public void unregisterBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) {
// Get all the Nameservers and iterate over the send logout message
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if(nameServerAddressList ! =null) {
for (String namesrvAddr : nameServerAddressList) {
try {
this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
} catch (Exception e) {
log.warn("unregisterBroker Exception, {}", namesrvAddr, e); }}}}Copy the code
BrokerOuterAPI#unregisterBroker method: BrokerOuterAPI#unregisterBroker method: BrokerOuterAPI#unregisterBroker
public void unregisterBroker(
final String namesrvAddr,
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
// Send the logout message: requestcode.unregister_broker
RemotingCommand request = RemotingCommand.createRequestCommand(
c, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
assertresponse ! =null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}
Copy the code
The final call to RemotingClient#invokeSync is requestCode. UNREGISTER_BROKER, which corresponds to NameServer receiving the broker’s logout message.
3. StartBroker
:start(...)
Let’s look again at BrokerController#start: BrokerController#start
public void start(a) throws Exception {
// Start each component
// Start message storage-related components
if (this.messageStore ! =null) {
this.messageStore.start();
}
// Start remotingServer, which is a netty service that receives messages from Producer
if (this.remotingServer ! =null) {
this.remotingServer.start(); }...// The component of the broker that distributes messages. It is used when reporting survival messages to nameServer. It is also a Netty service
if (this.brokerOuterAPI ! =null) {
this.brokerOuterAPI.start(); }...// Broker core heartbeat registration task
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
try {
BrokerController.this.registerBrokerAll(true.false,
brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e); }}/ / brokerConfig getRegisterNameServerPeriod () value is 1000 * 30, finally calculated the default performs a 30 seconds
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); . }Copy the code
This method is mainly to start the components, here are some important components to start:
messageStore
: message store component, which starts message storage-related threads such as message delivery operations,commitLog
Of the fileflush
Operation,comsumeQueue
Of the fileflush
Operation etc.remotingServer
:netty
A service used to receive request messages, such asproducer
Incoming messagebrokerOuterAPI
: Also anetty
A service used to send messages externally, as tonameServer
Reporting heartbeat message- Start a scheduled task:
broker
tonameServer
Sending a registration message
Here we’ll focus on how a scheduled task sends a heartbeat.
The interval for processing registration messages is as follows:
Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)
Copy the code
The time interval can be self-configurable, but cannot be less than 10s, cannot be greater than 60s, the default is 30s.
BrokerController#registerBrokerAll(…) handles message registration. , the code is as follows:
public synchronized void registerBrokerAll(final boolean checkOrderConfig,
boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper
= this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// Handle topic-related configurations
if(! PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || ! PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
...
}
// This will determine whether registration is required
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// Perform registrationdoRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); }}Copy the code
BrokerController#needRegister this method is used to handle registration, but it validates that registration is required. BrokerController#needRegister validates that registration is required.
private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper
= this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// Determine whether registration is required
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName,
brokerId, topicConfigWrapper, timeoutMills);
// If one of them has changed, it is necessary to register
boolean needRegister = false;
for (Boolean changed : changeList) {
if (changed) {
needRegister = true;
break; }}return needRegister;
}
Copy the code
This method calls the brokerOuterAPI. NeedRegister (…). To determine if the broker has changed. Any change on a NameServer indicates that a registration operation is required.
brokerOuterAPI.needRegister(…) How can you tell if a broker has changed? Follow up BrokerOuterAPI#needRegister:
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper,
final int timeoutMills) {
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
// Get all nameserVers
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if(nameServerAddressList ! =null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// Iterate over all nameservers, sending requests one by one
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run(a) {
try {
QueryDataVersionRequestHeader requestHeader
= newQueryDataVersionRequestHeader(); .// To send a message to nameServer, the command is requestcode.query_data_version
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
// Send the current DataVersion to nameServer
request.setBody(topicConfigWrapper.getDataVersion().encode());
// Send a request to nameServer
RemotingCommand response = remotingClient
.invokeSync(namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryDataVersionResponseHeader queryDataVersionResponseHeader =
(QueryDataVersionResponseHeader) response
.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if(body ! =null) {
/ / get the DataVersion
nameServerDataVersion = DataVersion.decode(body, D
ataVersion.class);
// Here is the key to judgment
if(! topicConfigWrapper.getDataVersion() .equals(nameServerDataVersion)) { changed =true; }}if (changed == null|| changed) { changedList.add(Boolean.TRUE); }}default:
break; }... }catch (Exception e) {
...
} finally{ countDownLatch.countDown(); }}}); }try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("query dataversion from nameserver countDownLatch await Exception", e); }}return changedList;
}
Copy the code
In this method, we first traverse all nameServer and send each nameServer a parameter with code requestcode. QUERY_DATA_VERSION, which is the data_version of the current broker. When nameServer receives the message, it returns the DataVersion saved in nameServer that corresponds to the current broker. When the two versions are not equal, it indicates that the current broker has changed and needs to be re-registered.
So what is DataVersion? Part of the code looks like this:
public class DataVersion extends RemotingSerializable {
/ / timestamp
private long timestamp = System.currentTimeMillis();
// Counter, which can be interpreted as the latest version number
private AtomicLong counter = new AtomicLong(0);
public void nextVersion(a) {
this.timestamp = System.currentTimeMillis();
this.counter.incrementAndGet();
}
/** * equals (timestamp and counter are equal) */
@Override
public boolean equals(final Object o) {
if (this == o)
return true;
if (o == null|| getClass() ! = o.getClass())return false;
final DataVersion that = (DataVersion) o;
if(timestamp ! = that.timestamp) {return false;
}
if(counter ! =null&& that.counter ! =null) {
return counter.longValue() == that.counter.longValue();
}
return (null == counter) && (null== that.counter); }... }Copy the code
From the equals() method of datSpanning, two DatSpanning objects are equal only if both timestamp and counter are equal. So where are these two values going to be modified? From the call to the DataVersion#nextVersion method, there are two main ways to cause these values to change:
broker
A new one has been created ontopic
topic
The change of the hair
In both cases, the DataVersion#nextVersion method is called, causing a change to the DataVersion. The DataVersion changes, indicating that the current broker needs to register with nameServer.
Let’s go back to BrokerController#registerBrokerAll(…) Methods:
public synchronized void registerBrokerAll(final boolean checkOrderConfig,
boolean oneway, boolean forceRegister) {...// This will determine whether registration is required
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
// Perform registrationdoRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); }}Copy the code
The method for handling registration is BrokerController#doRegisterBrokerAll. Take a look at the flow:
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
/ / register
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
// This object contains the current version of the broker
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister()); . }Copy the code
Keep going and the BrokerOuterAPI#registerBroker method is eventually called:
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// Build the request
RemotingCommand request = RemotingCommand
.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
// Handle the send operation: sendOneWay
if (oneway) {
try {
// Register operation
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null; . }... }Copy the code
So, when nameServer sends a message with the code requestCode.register_broker, the message contains the topic information of the current broker, version number, etc.
4. To summarize
This article mainly analyzes the startup process of the broker. In general, the startup process can be divided into three parts:
- Parses the configuration file, which parses the various configurations and assigns them to the corresponding objects
BrokerController
Create and initialize: CreatedBrokerController
Object and perform initialization operations, such as loading configuration files, creating thread pools, registering request handlers, starting scheduled tasks, and so onBrokerController
Startup: This step is startupbroker
Core components, such asmessageStore
(Message store),remotingServer
(netty
Service for processingproducer
withconsumer
Request),brokerOuterAPI
(netty
Service, used tonameServer
The current reportbroker
Information, etc.
In the analysis startup process, two types of messages are analyzed:
- in
ShutdownHook
,broker
Will send tonameServer
Send a logout message, which indicates thatbroker
Before closing,nameServer
Clears the currentbroker
Registration information of broker
After the system is started, a scheduled task is started to determine whether a data query is requirednameServer
Register, determine whether to register, will be tonameServer
sendcode
forQUERY_DATA_VERSION
The message fromnameServer
Get the currentbroker
The version number is different from the local version numberbroker
Re – registered, that is, send a registration message.
Limited by space, this article will stop here and continue to analyze the broker in the next article.
Limited to the author’s personal level, there are inevitable mistakes in the article, welcome to correct! Original is not easy, commercial reprint please contact the author to obtain authorization, non-commercial reprint please indicate the source.
This article was first published in the wechat public number Java technology exploration, if you like this article, welcome to pay attention to the public number, let us explore together in the world of technology!