sequence

This paper mainly studies HostReactor of NACOS-SDK-Go

HostReactor

Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go

type HostReactor struct {
	serviceInfoMap       cache.ConcurrentMap
	cacheDir             string
	updateThreadNum      int
	serviceProxy         NamingProxy
	pushReceiver         PushReceiver
	subCallback          SubscribeCallback
	updateTimeMap        cache.ConcurrentMap
	updateCacheWhenEmpty bool
}
Copy the code
  • HostReactor defines serviceInfoMap, cacheDir, updateThreadNum, serviceProxy, pushReceiver, subCallback, updateTimeMap, and updateCacheWhe NEmpty properties

NewHostReactor

Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go

func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {
	if updateThreadNum <= 0 {
		updateThreadNum = Default_Update_Thread_Num
	}
	hr := HostReactor{
		serviceProxy:         serviceProxy,
		cacheDir:             cacheDir,
		updateThreadNum:      updateThreadNum,
		serviceInfoMap:       cache.NewConcurrentMap(),
		subCallback:          subCallback,
		updateTimeMap:        cache.NewConcurrentMap(),
		updateCacheWhenEmpty: updateCacheWhenEmpty,
	}
	pr := NewPushRecevier(&hr)
	hr.pushReceiver = *pr
	if! notLoadCacheAtStart { hr.loadCacheFromDisk() } go hr.asyncUpdateService()return hr
}
Copy the code
  • The NewHostReactor method creates HostReactor, then NewPushRecevier creates pushReceiver, loadCacheFromDisk if notLoadCacheAtStart is false, Then asyncUpdateService is executed asynchronously

loadCacheFromDisk

Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go

func (hr *HostReactor) loadCacheFromDisk() {
	serviceMap := cache.ReadServicesFromFile(hr.cacheDir)
	if serviceMap == nil || len(serviceMap) == 0 {
		return
	}
	for k, v := range serviceMap {
		hr.serviceInfoMap.Set(k, v)
	}
}
Copy the code
  • LoadCacheFromDisk method through the cache. ReadServicesFromFile serviceMap (hr) cacheDir)

asyncUpdateService

Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go

func (hr *HostReactor) asyncUpdateService() {
	sema := utils.NewSemaphore(hr.updateThreadNum)
	for {
		for _, v := range hr.serviceInfoMap.Items() {
			service := v.(model.Service)
			lastRefTime, ok := hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))
			if! ok { lastRefTime = uint64(0) }if uint64(utils.CurrentMillis())-lastRefTime.(uint64) > service.CacheMillis {
				sema.Acquire()
				go func() {
					hr.updateServiceNow(service.Name, service.Clusters)
					sema.Release()
				}()
			}
		}
		time.Sleep(1 * time.Second)
	}

}
Copy the code
  • AsyncUpdateService traverses serviceInfoMap, Obtain lastRefTime using hr.updateTimemap. Get(utils.getServicecacheKey (service.name, service.clusters)). Then determine whether the number of services.CacheMillis exceeds the haul level, sema.acquire (), and asynchronize hr.Updateservicenow (service.Name, service.Clusters) is executed. Sema.release ()

updateServiceNow

Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go

func (hr *HostReactor) updateServiceNow(serviceName string, clusters string) {
	result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
	iferr ! = nil { log.Printf("[ERROR]:query list return error!servieName:%s cluster:%s  err:%s \n", serviceName, clusters, err.Error())
		return
	}
	if result == "" {
		log.Printf("[ERROR]:query list is empty! servieName:%s cluster:%s \n", serviceName, clusters)
		return
	}
	hr.ProcessServiceJson(result)
}
Copy the code
  • UpdateServiceNow method through hr. ServiceProxy. QueryList (serviceName, clusters, hr. PushReceiver. Port, false) get json, The JSON is then parsed through hr.processServicejson (result)

ProcessServiceJson

Nacos – SDK – go – v0.3.2 / clients/naming_client/host_reator. Go

func (hr *HostReactor) ProcessServiceJson(result string) {
	service := utils.JsonToService(result)
	if service == nil {
		return
	}
	cacheKey := utils.GetServiceCacheKey(service.Name, service.Clusters)

	oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
	ifok && ! hr.updateCacheWhenEmpty { //if instance list is empty,not to update cache
		if service.Hosts == nil || len(service.Hosts) == 0 {
			log.Printf("[ERROR]:do not have useful host, ignore it, name:%s \n", service.Name)
			return
		}
	}
	hr.updateTimeMap.Set(cacheKey, uint64(utils.CurrentMillis()))
	hr.serviceInfoMap.Set(cacheKey, *service)
	if! ok || ok && ! reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {if! ok { log.Println("[INFO] service not found in cache " + cacheKey)
		} else {
			log.Printf("[INFO] service key:%s was updated to:%s \n", cacheKey, utils.ToJsonString(service))
		}
		cache.WriteServicesToFile(*service, hr.cacheDir)
		hr.subCallback.ServiceChanged(service)
	}
}
Copy the code
  • The ProcessServiceJson method parses json to model.service via utils.jsontoService (result), GetServiceCacheKey(service.name, service.clusters) is used to build cacheKey, and hr.updatetimemap and hr.ServiceInfomap are updated. For cache does not exist or cache exists changes are executed cache. WriteServicesToFile (* service, hr. CacheDir), and then trigger the hr. SubCallback. ServiceChanged (service)

summary

HostReactor defines serviceInfoMap, cacheDir, updateThreadNum, serviceProxy, pushReceiver, subCallback, updateTimeMap, and updateCacheWhe NEmpty properties

doc

  • host_reator