sequence
This paper mainly studies HostReactor of NACOS-SDK-Go
HostReactor
Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go
type HostReactor struct {
serviceInfoMap cache.ConcurrentMap
cacheDir string
updateThreadNum int
serviceProxy NamingProxy
pushReceiver PushReceiver
subCallback SubscribeCallback
updateTimeMap cache.ConcurrentMap
updateCacheWhenEmpty bool
}
Copy the code
- HostReactor defines serviceInfoMap, cacheDir, updateThreadNum, serviceProxy, pushReceiver, subCallback, updateTimeMap, and updateCacheWhe NEmpty properties
NewHostReactor
Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go
func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {
if updateThreadNum <= 0 {
updateThreadNum = Default_Update_Thread_Num
}
hr := HostReactor{
serviceProxy: serviceProxy,
cacheDir: cacheDir,
updateThreadNum: updateThreadNum,
serviceInfoMap: cache.NewConcurrentMap(),
subCallback: subCallback,
updateTimeMap: cache.NewConcurrentMap(),
updateCacheWhenEmpty: updateCacheWhenEmpty,
}
pr := NewPushRecevier(&hr)
hr.pushReceiver = *pr
if! notLoadCacheAtStart { hr.loadCacheFromDisk() } go hr.asyncUpdateService()return hr
}
Copy the code
- The NewHostReactor method creates HostReactor, then NewPushRecevier creates pushReceiver, loadCacheFromDisk if notLoadCacheAtStart is false, Then asyncUpdateService is executed asynchronously
loadCacheFromDisk
Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go
func (hr *HostReactor) loadCacheFromDisk() {
serviceMap := cache.ReadServicesFromFile(hr.cacheDir)
if serviceMap == nil || len(serviceMap) == 0 {
return
}
for k, v := range serviceMap {
hr.serviceInfoMap.Set(k, v)
}
}
Copy the code
- LoadCacheFromDisk method through the cache. ReadServicesFromFile serviceMap (hr) cacheDir)
asyncUpdateService
Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go
func (hr *HostReactor) asyncUpdateService() {
sema := utils.NewSemaphore(hr.updateThreadNum)
for {
for _, v := range hr.serviceInfoMap.Items() {
service := v.(model.Service)
lastRefTime, ok := hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))
if! ok { lastRefTime = uint64(0) }if uint64(utils.CurrentMillis())-lastRefTime.(uint64) > service.CacheMillis {
sema.Acquire()
go func() {
hr.updateServiceNow(service.Name, service.Clusters)
sema.Release()
}()
}
}
time.Sleep(1 * time.Second)
}
}
Copy the code
- AsyncUpdateService traverses serviceInfoMap, Obtain lastRefTime using hr.updateTimemap. Get(utils.getServicecacheKey (service.name, service.clusters)). Then determine whether the number of services.CacheMillis exceeds the haul level, sema.acquire (), and asynchronize hr.Updateservicenow (service.Name, service.Clusters) is executed. Sema.release ()
updateServiceNow
Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go
func (hr *HostReactor) updateServiceNow(serviceName string, clusters string) {
result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
iferr ! = nil { log.Printf("[ERROR]:query list return error!servieName:%s cluster:%s err:%s \n", serviceName, clusters, err.Error())
return
}
if result == "" {
log.Printf("[ERROR]:query list is empty! servieName:%s cluster:%s \n", serviceName, clusters)
return
}
hr.ProcessServiceJson(result)
}
Copy the code
- UpdateServiceNow method through hr. ServiceProxy. QueryList (serviceName, clusters, hr. PushReceiver. Port, false) get json, The JSON is then parsed through hr.processServicejson (result)
ProcessServiceJson
Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go
func (hr *HostReactor) ProcessServiceJson(result string) {
service := utils.JsonToService(result)
if service == nil {
return
}
cacheKey := utils.GetServiceCacheKey(service.Name, service.Clusters)
oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
ifok && ! hr.updateCacheWhenEmpty { //if instance list is empty,not to update cache
if service.Hosts == nil || len(service.Hosts) == 0 {
log.Printf("[ERROR]:do not have useful host, ignore it, name:%s \n", service.Name)
return
}
}
hr.updateTimeMap.Set(cacheKey, uint64(utils.CurrentMillis()))
hr.serviceInfoMap.Set(cacheKey, *service)
if! ok || ok && ! reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {if! ok { log.Println("[INFO] service not found in cache " + cacheKey)
} else {
log.Printf("[INFO] service key:%s was updated to:%s \n", cacheKey, utils.ToJsonString(service))
}
cache.WriteServicesToFile(*service, hr.cacheDir)
hr.subCallback.ServiceChanged(service)
}
}
Copy the code
- The ProcessServiceJson method parses json to model.service via utils.jsontoService (result), GetServiceCacheKey(service.name, service.clusters) is used to build cacheKey, and hr.updatetimemap and hr.ServiceInfomap are updated. For cache does not exist or cache exists changes are executed cache. WriteServicesToFile (* service, hr. CacheDir), and then trigger the hr. SubCallback. ServiceChanged (service)
summary
HostReactor defines serviceInfoMap, cacheDir, updateThreadNum, serviceProxy, pushReceiver, subCallback, updateTimeMap, and updateCacheWhe NEmpty properties
doc
- host_reator