preface

We’ve talked a lot about Nacos’s health checks in the past, such as “Microservices: Service Hangs too crisp and Nacos doesn’t know what to do?” The article also provides custom tuning for health checks. So how does Nacos’s health check and heartbeat work? Can we refer to Nacos health check mechanism in project practice and apply it in other places?

In this article, we will take you to uncover the veil of Nacos health check mechanism.

Nacos health check

Temporary instances in Nacos remain active based on heartbeat reporting. The basic health check process is as follows: The Nacos client maintains a scheduled task and sends heartbeat requests every five seconds to ensure that it is active. If the Nacos server does not receive a heartbeat request from the client within 15 seconds, the instance is set to unhealthy, and if it does not receive a heartbeat within 30 seconds, the temporary instance is removed.

The principle is very simple, about the implementation of the code layer, the following step by step to analyze.

Client Heartbeat

Example based on the form of heartbeat report to maintain activity, of course, can not do without the realization of heartbeat function. The analysis is based on the client heartbeat implementation.

Spring Cloud provides a standard interface service Stry, and Nacos’s implementation class is NacosService stry. When a Spring Cloud project starts, it instantiates NacosServiceRegistry and invokes its register method to register the instance.

@Override public void register(Registration registration) { // ... NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); }catch (Exception e) { // ... }}Copy the code

There are two place need to pay attention to in the method, the first is to build the Instance getNacosInstanceFromRegistration method, this method will be set within the Instance metadata (metadata), can be configured through source metadata server health check parameters. For example, the following parameters configured in Spring Cloud can be passed to the Nacos server at service registration via metadata items.

spring:
  application:
    name: user-service-provider
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
        heart-beat-interval: 5000
        heart-beat-timeout: 15000
       ip-delete-timeout: 30000
Copy the code

Heart-beat-interval, heart-beat-timeout, and ip-delete-timeout health check parameters are reported based on metadata.

The second part of the register method is to register an instance by calling NamingService#registerInstance. NamingService is provided by the Nacos client, which means that the heartbeat of the Nacos client itself is provided by the Nacos ecosystem.

In the registerInstance method the following method is eventually called:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    serverProxy.registerService(groupedServiceName, groupName, instance);
}
Copy the code

BeatInfo#addBeatInfo is the entry point for heartbeat processing. The prerequisite, of course, is that the current instance needs to be a temporary (instantaneous) instance.

The corresponding method is as follows:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) ! = null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }Copy the code

As you can see in the penultimate line, the client processes the heartbeat through a scheduled task, and the specific heartbeat request is completed by the BeatTask. Timing task execution frequency, encapsulated in BeatInfo, back up, will find that the Period from Instance# BeatInfo getInstanceHeartBeatInterval (). The specific implementation of this method is as follows:

public long getInstanceHeartBeatInterval() {
    return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
Copy the code

Timing task can see the implementation of the interval is data preserved in the configuration metadata.. Heart beat. The interval, and the above mentioned configuration heart – beat – interval nature is one thing, the default is 5 seconds.

The BeatTask class is implemented as follows:

class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) { NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); } executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }}Copy the code

In the run method, the heartbeat request is sent with NamingProxy#sendBeat, and at the end of the run method, a scheduled task is started again, so that the heartbeat request is made periodically.

NamingProxy#sendBeat

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); } Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (! lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }Copy the code

In fact, is to call Nacos server to provide “/ Nacos/v1 / ns/instance/beat” service.

The default parameters for the heartbeat are defined in the client constant class:

static {
    DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
    DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
    DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L);
}
Copy the code

This echoes several temporal dimensions of the HEALTH check mechanism of Nacos.

The server receives the heartbeat

Can already see that in the process of analyzing the client’s request is/nacos/v1 / ns/instance/beat the service. The Nacos server is implemented in InstanceController in the Naming project.

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    // ...
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

    if (instance == null) {
        // ...
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }

    Service service = serviceManager.getService(namespaceId, serviceName);
    // ...
    service.processClientBeat(clientBeat);
    // ...
    return result;
}
Copy the code

When a server receives a request, it does two main things: first, if the instance sending the heartbeat does not exist, it registers it; Second, the processClientBeat method of its Service is called for heartbeat processing.

The processClientBeat method is implemented as follows:

public void processClientBeat(final RsInfo rsInfo) {
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
Copy the code

ClientBeatProcessor is also a Runnable Task that executes immediately through the scheduleNow method defined by HealthCheckReactor.

ScheduleNow method implementation:

public static ScheduledFuture<? > scheduleNow(Runnable task) { return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS); }Copy the code

Now look at the implementation of a specific task in ClientBeatProcessor:

@Override public void run() { Service service = this.service; // logging String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { // logging instance.setLastBeat(System.currentTimeMillis()); if (! instance.isMarked()) { if (! instance.isHealthy()) { instance.setHealthy(true); // logging getPushService().serviceChanged(service); } } } } }Copy the code

In the run method, check whether the instance that sends heartbeat is consistent with the IP address. If so, update the last heartbeat time. At the same time, if the instance was previously unmarked and in an unhealthy state, it is changed to a healthy state and the change is published via PushService providing an event mechanism. Events are published by Spring’s ApplicationContext and the event is ServiceChangeEvent.

With the above heartbeat operation, the health status and last heartbeat time of the Nacos server instance have been refreshed. So, how does the server determine if the heartbeat is not received?

Heartbeat check on the server

The client initiates the heartbeat, and the server checks whether the heartbeat of the client is normal or whether the heartbeat update time of the corresponding instance is normal.

The server heartbeat is triggered when the service instance is registered. Also in InstanceController, register is implemented as follows:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // ...
    final Instance instance = parseInstance(request);

    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}
Copy the code

ServiceManager#registerInstance implements the following code:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // ...
}
Copy the code

The heartthrob-related implementation is implemented in the first empty Service creation and is eventually tuned to the following method:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
Copy the code

Initialize Service in the putServiceAndInit method:

private void putServiceAndInit(Service service) throws NacosException {
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    service.init();
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
Copy the code

Service.init ()

public void init() { HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); }}Copy the code

HealthCheckReactor#scheduleCheck

public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
Copy the code

The delay is 5 seconds and the check is performed every 5 seconds.

The first line of the init method shows the health check Task implemented by ClientBeatCheckTask. The core code of the run method is as follows:

@Override
public void run() {
    // ...        
    List<Instance> instances = service.allIPs(true);
    
    // first set health status of instances:
    for (Instance instance : instances) {
        if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
            if (!instance.isMarked()) {
                if (instance.isHealthy()) {
                    instance.setHealthy(false);
                    // logging...
                    getPushService().serviceChanged(service);
                    ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                }
            }
        }
    }
    
    if (!getGlobalConfig().isExpireInstance()) {
        return;
    }
    
    // then remove obsolete instances:
    for (Instance instance : instances) {
        
        if (instance.isMarked()) {
            continue;
        }
        
        if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
            // delete instance
            deleteIp(instance);
        }
    }
}
Copy the code

In the first for loop, determine whether the interval between the current time and the last heartbeat is longer than the timeout. If the instance has timed out, is marked, and the health status is healthy, the health status is set to Unhealthy and the event of the state change is published.

In the second for loop, you jump out of the loop if the instance has already been flagged. If no flag is displayed and the interval between the current heartbeat time and the last heartbeat time is longer than the time to delete the IP address, the instance is deleted.

summary

Through the source code analysis in this paper, we start from Spring Cloud, trace to the heartbeat time in the Nacos Client, and then trace to the implementation of the Nacos server receiving the heartbeat and checking whether the instance is healthy. Presumably by combing through the entire source code, you already have an understanding of the implementation of the entire Nacos heartbeat. Follow me for updates on the latest dry products from Nacos.

Nacos series

  • Source code analysis of Spring Cloud Integration Nacos Service Discovery?
  • Want to learn the service discovery of micro-service? Let’s learn some popular science knowledge first.
  • Nacos, the Soul Ferryman of Microservices, here is a complete overview of the principle.
  • You are also interested in reading source code. Tell me how I read Nacos source code.
  • “Learn Nacos? Let’s get the service up first, Practical tutorial”
  • Microservices: What if Nacos didn’t react when the Service hung too crisp?
  • Microservices: Poking fun at the Crazy output of Nacos Logs
  • An Easy example of Spring Cloud integration with Nacos

About the blogger: Author of the technology book SpringBoot Inside Technology, loves to delve into technology and writes technical articles.

Public account: “program new vision”, the blogger’s public account, welcome to follow ~

Technical exchange: Please contact the weibo user at Zhuan2quan