Create NamesrvController
I am here are intercepted part of the code, combing the general logic, specific details of the part need to read the source code
NamesrvStartup
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
}
Copy the code
public static NamesrvController start(final NamesrvController controller) throws Exception {
boolean initResult = controller.initialize();
controller.start();
return controller;
}
Copy the code
public void start(a) throws Exception {
// To actually start nameserver, this is the start that calls the remotingServer interface
this.remotingServer.start();
if (this.fileWatchService ! =null) {
this.fileWatchService.start(); }}Copy the code
RemotingService
public interface RemotingService {
// The start implementation is divided into two classes, one is the client side, one is the server side, both with the help of netty to complete
void start(a);
void shutdown(a);
void registerRPCHook(RPCHook rpcHook);
}
Copy the code
Initialize NameServerController
public boolean initialize(a) {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
// A scan is performed every 10 seconds to detect all brokers that have gone offline, with the first delay of five seconds
@Override
public void run(a) {
NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }},5.10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run(a) {
NamesrvController.this.kvConfigManager.printAllPeriodically(); }},1.10, TimeUnit.MINUTES);
return true;
}
Copy the code
RouteInfoManager
// The expiration time is two minutes
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// Scan out all brokers that have fallen offline
public void scanNotActiveBroker(a) {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); }}}Copy the code
Broker is registered with nameserver
NamesrvController
private void registerProcessor(a) {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
// Nameservier's default requests are registered and handed over to NettyServer for processing
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); }}Copy the code
DefaultRequestProcessor
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
// Register broker requests with nameserver
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
// Register broker to nameserver
return this.registerBroker(ctx, request);
}
Copy the code
/ / call the namesrvController. GetRouteInfoManager () registerBroker method really broker in the nameserver registration
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
Copy the code
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// Only one thread can access it at a time
this.lock.writeLock().lockInterruptibly();
ClusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName = clusterName
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// Add brokers to a cluster
brokerNames.add(brokerName);
boolean registerFirst = false;
Brokerdata is retrieved according to BrokerName
BrokerAddrTable holds the detailed routing information for all brokers
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
If the broker registers for the first time, brokerDate is null, and a BrokerData is new, putting routing information into brokerAddrTable
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);
if (null! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if(tcTable ! =null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue()); }}}}// This is the core processing logic of the broker heartbeat
// By default a new BrokerLiveInfo is put to brokerLiveTable every 30 seconds, overriding the last heartbeat time
//BrokerLiveInfo This System.currentTimemillis (), the current timestamp is the broker's latest heartbeat time
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if(filterServerList ! =null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! =null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if(brokerLiveInfo ! =null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); }}}}finally {
this.lock.writeLock().unlock(); }}catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
Copy the code