Series of articles
Source analysis of Broker yuanzhicun.blog.csdn.net/article/det…
Rocket source analysis of the NameServer yuanzhicun.blog.csdn.net/article/det…
preface
Previous articles have analyzed the creation of NameServer, the creation of Broker, four core objects, etc. Today, the main analysis is the interaction between Broker and NameServer after the start of the heartbeat information
To understand the Broker’s source code, it is very simple. You should focus on a few points in this diagram:
1. Since the Broker needs to communicate with NameServer and receive requests from producer consumers, it must have a network service, so it will integrate NettyServer and NettyClient
2. You can’t block synchronously every time the client requests the Broker, so there is a thread pool to handle all requests
3.Broker will have different functions, such as high availability, Dleger management CommitLog, ConsummeQueue, and some other functions are also required, so there will be different functions corresponding to different components
4. There are always some scheduled tasks in an application, such as sending heartbeat periodically, cleaning up periodically, and dropping disks periodically, which require some background thread pools
With the above four questions in mind, understand the diagram and follow me through the code
You need to look at the diagram above to understand the code analysis below
Broker startup
We have studied the process by which the Broker starts the creation of a Controller. Now we have studied how to start the service of the Controller once it is createdcontroller.start()Uses a timer to send registered heartbeat information to the NameServer
Register with NameServer
2. Actually implementing the registration method
brokerOuterAPI.registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, Final Boolean compressed) {// Store result final List<RegisterBrokerResult> registerBrokerResultList = Lists. NewArrayList (); Get the NameServer List / / a List < String > nameServerAddressList = this. RemotingClient. GetNameServerAddressList (); if (nameServerAddressList ! = null && nameServerAddressList. The size () > 0) {/ / packaging request header final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); // wrapper requestBody RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); // Use CountDownLatch to control the main thread. Other threads are final registration done CountDownLatch CountDownLatch = new CountDownLatch (nameServerAddressList. The size ()); for (final String namesrvAddr : NameServerAddressList) {brokerOuterExecutor. Execute (new Runnable () {@ Override public void the run () {try {/ / perform registration RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result ! = null) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { //-1 countDownLatch.countDown(); }}}); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }Copy the code
The above code just gets the information and encapsulates the header and body and sends it to NameServer via NettyClient and gets the response. Method further in-depth without taking you to see, which is the use of Netty API. That’s enough to understand
How does NameServer handle Broker requests
RemotingServer is a NettyServer object created by NameServer to monitor connectionsThe basic process for everyone to wear up, you need to do is according to the content flow I wrote, to track their own code to understand
Write at the end, thanks for the likes
Welcome to follow my wechat official account [Village of the Apes]
To talk about Java interview and my wechat further communication and learning, wechat mobile search [codeYuanzhicunup] can be added if there are related technical problems welcome to leave a message to discuss, the public number is mainly used for technology sharing, including often meet test analysis, as well as source code interpretation, micro service framework, technical hot spots, etc..