Due to space problems, this is a sequel to the previous article, including self-protection mechanism and peer replication related content, address of the previous article

Self-protection mechanism

The self-protection mechanism is used to detect network abnormalities.

At the beginning of the EVICT method, it is stated that if the condition! If isLeaseExpirationEnabled() is true, this call is skipped. Look at this method:

@Override
public boolean isLeaseExpirationEnabled(a) {
    // Always return true if the server configuration does not allow self-protection
    if(! isSelfPreservationModeEnabled()) {// The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    
    // Gets the number of renewals in the last minute. If they are not greater than the threshold for renewals, the registry considers itself to have suffered a network partition
    / / numberOfRenewsPerMinThreshold is
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
Copy the code

After looking at this approach, we may have two questions:

  1. getNumOfRenewsInLastMin(): How to calculate the number of renewals in the last minute;
  2. numberOfRenewsPerMinThresholdWhere does it come from?

Let’s look at the first question, how to calculate the number of renewals in the previous contract:

The method is as follows:

private final MeasuredRate renewsLastMin;

@Override
public long getNumOfRenewsInLastMin(a) {
    return renewsLastMin.getCount();
}
Copy the code

Here’s the MeasuredRate class, and let’s analyze it to see what it does:

/**
 * Utility class for getting a count in last X milliseconds.
 *
 * @author Karthik Ranganathan,Greg Kim
 */
public class MeasuredRate {
    private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);
    // represents the bucket of the previous period, which records the number of buckets in the previous period
    private final AtomicLong lastBucket = new AtomicLong(0);
    // Represents the number of buckets in the time period currently being calculated;
    private final AtomicLong currentBucket = new AtomicLong(0);

    // Represents the sampling duration of each bucket (interval)
    private final long sampleInterval;
    // A timer to switch buckets
    private final Timer timer;

    // Whether to start
    private volatile boolean isActive;

    / * * *@param sampleInterval in milliseconds
     */
    public MeasuredRate(long sampleInterval) {
        this.sampleInterval = sampleInterval;
        this.timer = new Timer("Eureka-MeasureRateTimer".true);
        this.isActive = false;
    }

    public synchronized void start(a) {
        if(! isActive) {// Start a scheduled task
            timer.schedule(new TimerTask() {

                @Override
                public void run(a) {
                    try {
                        // Get the current bucket value and set the current bucket value to 0,
                        // Set the value of the current bucket to the previous bucket
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
            }, sampleInterval, sampleInterval);

            isActive = true; }}public synchronized void stop(a) {
        if (isActive) {
            // Cancel the scheduled task
            timer.cancel();
            isActive = false; }}/** * Returns the count in the last sample interval. */
    public long getCount(a) {
        // Get the count of the last interval
        return lastBucket.get();
    }

    /** * Increments the count in the current sample interval. */
    public void increment(a) {
        // Is the increment count for the current intervalcurrentBucket.incrementAndGet(); }}Copy the code

Increment_count (increment) Increment_count (increment); increment_count (increment); increment_count (increment);

By clicking on com.net flix. Eureka. Util. MeasuredRate# increment for the call,

Can see at com.net flix. Eureka. Registry. AbstractInstanceRegistry# renew method calls.

The second question, how numberOfRenewsPerMinThreshold configuration?

By looking at the use of this property, you can see that in the method

In the com.net flix. Eureka. Registry. AbstractInstanceRegistry# updateRenewsPerMinThreshold:

protected void updateRenewsPerMinThreshold(a) {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
}
Copy the code

ExpectedNumberOfClientsSendingRenews record is expected to contract the client the number of instance (services).

ServerConfig. GetExpectedClientRenewalIntervalSeconds () is looking forward to the client contract interval (in seconds).

ServerConfig. GetRenewalPercentThreshold threshold () is the percentage of the contract.

For example, if the number of service instances is 10, the renewal interval is 30 seconds (default value), and the threshold is 0.85 (default value) :

# take down the whole numberOfRenewsPerMinThreshold = 10 * * 0.85 (60/30)Copy the code

It can be concluded that the renewal threshold per minute is 17.

Conclusion: The self-protection mechanism senses whether it is in a network partition by counting the number of renewals.

Peer-to-peer replication

The relationship between multiple Eurekaservers is peer to peer. Only one node can still provide services.

Peer replication is used to synchronize data among multiple EurekaserVers so that the multiple EurekaserVers can have consistent data and provide services independently. Peer-to-peer replication involves complex network relationships. If it is not well designed, it may encounter two problems: 1. A large number of requests in the network; 2. Data conflict;

Peer-to-peer replication involves two ends: the initiator and the receiver of the replication.

The initiator of the replication

Replication initiated the implementation lies in PeerAwareInstanceRegistryImpl, here to rewrite related request processing, such as renew:

public boolean renew(final String appName, final String id, final boolean isReplication) {
    // If the parent class successfully processed the request
    if (super.renew(appName, id, isReplication)) {
        // Handle peer replication
        replicateToPeers(Action.Heartbeat, appName, id, null.null, isReplication);
        return true;
    }
    return false;
}
Copy the code

In replicateToPeers:

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */.boolean isReplication) {
    // Count time
    Stopwatch tracer = action.getTimer().start();
    try {
        // In the case of replication, the count of statistics increases
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If there is no peer node and the request is already duplicated, return to skip
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
		// Iterate over all nodes
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If node represents itself, skip it
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // Handle the copy operationreplicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); }}finally{ tracer.stop(); }}Copy the code

In replicateInstanceActionsToPeers approach:

private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry;
        CurrentRequestVersion.set(Version.V2);
        // Select different processing logic depending on the operation to be copied
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                // Get the latest information from the registry
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break; }}catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    } finally{ CurrentRequestVersion.remove(); }}Copy the code

In the case of registered replication, node.register(info) calls encapsulate information as a task and submit it to the task dispatcher, which then executes in batches.

public void register(final InstanceInfo info) throws Exception {
    // The expiration time of this message
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // Submit the task, taskId is used to distinguish the same type of events, used to remove the old type of events, to avoid invalid replication
    batchingDispatcher.process(
        taskId("register", info),
        new InstanceReplicationTask(targetHost, Action.Register, info, null.true) {
            public EurekaHttpResponse<Void> execute(a) {
                // Invoke client registration
                return replicationClient.register(info);
            }
        },
        expiryTime
    );
}
Copy the code

Batch execution after peer replication involves the following classes:

  • com.netflix.eureka.cluster.ReplicationTask: indicates a replication task. The specific task type is determined by the implementation class, and error handling methods are implemented to handle the special response (404 status code) returned by the server
  • com.netflix.eureka.util.batcher.TaskDispatchers: createTaskDispatcherAnd return an instance of the anonymous inner class
  • com.netflix.eureka.util.batcher.TaskDispatcher: Posts the replication task toAcceptorExecutorIn the instance
  • com.netflix.eureka.cluster.HttpReplicationClient: Communication client used to initiate replication requests
  • com.netflix.eureka.util.batcher.TrafficShaper: Traffic shaping. You can adjust the time of the next request based on the result of the previous client request (congestion or network error).
  • com.netflix.eureka.util.batcher.AcceptorExecutor: there is an internal thread that passes through an infinite loop based onTrafficShaperAdjust the scheduling time to distribute tasks.
  • com.netflix.eureka.util.batcher.TaskExecutors: Used to perform tasks, holding a number of worker threads, usedcom.netflix.eureka.util.batcher.AcceptorExecutor#requestWorkItemsMethods fromAcceptorExecutorInstance in which a task is requested for execution and, after processing fails, passescom.netflix.eureka.util.batcher.AcceptorExecutor#reprocess(com.netflix.eureka.util.batcher.TaskHolder<ID,T>, com.netflix.eureka.util.batcher.TaskProcessor.ProcessingResult)Calls to reprocess and register errors toTrafficShaper
  • com.netflix.eureka.cluster.protocol.ReplicationList: previously submitted toTaskDispatcherThe multipleReplicationTaskcom.netflix.eureka.cluster.ReplicationTaskProcessor#createReplicationListOfMethods are merged into a batch task, which is used as the request body to initiate a batch request

The above mechanisms, batch tasks and TrafficShaper, deal with the complexities of network communication, reducing traffic, and dealing with congestion and network problems.

And com.net flix. Eureka. Resources. InstanceResource# validateDirtyTimestamp this method to return a status code is to give a copy of the client according to the status code way of dealing with data conflict.

The receiver of the replication

Looking back at the previous logics related to registration, renewal, and offline, you can see that each has an isReplication value indicating whether this request is a replication request. In each of these requests, some processing is done on this variable, mainly with different values and different statistics maintenance involved. Importantly, if it is a duplicate request, PeerAwareInstanceRegistryImpl won’t copies the request to other peer node (to prevent duplicate copy).

Receiving is com.net flix regarding the batch request. Eureka. Resources. PeerReplicationResource# batchReplication, which will respectively according to the type of task calls to a single request processing method to deal with.

Com.net flix. Eureka. Resources. InstanceResource# validateDirtyTimestamp try to conflicting data, returns the informative status code.

Refer to the previous section for summaries and references