This is the 11th day of my participation in the Gwen Challenge in November. Check out the details: The last Gwen Challenge in 2021
This series code address: github.com/JoJoTec/spr…
In the previous section, we reviewed the implementation of Feign circuit breakers and thread isolation. In this section, we will not look at the source code implementation (because the source code will include improvements to the load balancing algorithm), but discuss how to optimize the current load balancing algorithm.
The previous load balancing algorithm
- Get the list of service instances and sort the list by IP port. If not, position may represent a previously invoked instance even if it is next
- Based on the traceId in the request, obtain an atomic variable position from the local cache with traceId as the key and an initial value of a random number. This prevents all requests from being called from the first instance, followed by the second and third.
- Position atom is incremented by one, then the number of instances is mod, and the instance with the corresponding subscript is returned to be called
Where the request contains traceId, we use spring-cloud-sleUTH link tracing. Based on this mechanism, we can ensure that the request will not be retried to the instance that has been called before. The source code is:
/ / must be must be implemented ReactorServiceInstanceLoadBalancer / / not ReactorLoadBalancer < ServiceInstance > / / because when registration is ReactorServiceInstanceLoadBalancer @ Log4j2 public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer { private final ServiceInstanceListSupplier serviceInstanceListSupplier; // Each request does not exceed 1 minute with retries // For requests that exceed 1 minute, the request must be heavy, Private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1) TimeUnit. MINUTES) / / random initial value, prevent begins with the first call. Every time the build (k - > new AtomicInteger (ThreadLocalRandom. The current () nextInt (0, 1000))); private final String serviceId; private final Tracer tracer; public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) { this.serviceInstanceListSupplier = serviceInstanceListSupplier; this.serviceId = serviceId; this.tracer = tracer; } // Each time you retry, Override public Mono<Response<ServiceInstance>> Choose (Request Request) {return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances)); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } return getInstanceResponseByRoundRobin(serviceInstances); } private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } // In order to solve the problem that different calls to the original algorithm may cause a request to retry the same instance // Get the context of the current request from sleuth's Tracer. If (currentSpan == null) {currentSpan = tracer.newtrace (); } // Get the request traceId from the request context, which uniquely identifies a request long l = currentspan.context ().traceid (); AtomicInteger seed = positionCache.get(l); int s = seed.getAndIncrement(); int pos = s % serviceInstances.size(); log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size()); Return new DefaultResponse(serviceInstance.stream () // First take again for sorting sorted (Comparator.com paring (ServiceInstance: : getInstanceId)). Collect (Collectors. ToList ()). The get (pos)); }}Copy the code
However, this load balancing algorithm still caused us problems in the case of a large increase in requests.
First of all, we did not expand this time, so the performance pressure of this time is very sensitive to the balanced distribution of pressure. Micro service, for example, assume that A nine instances, peaks in the business, the most ideal situation is to ensure that whenever the nine are completely balanced load pressure, but because we use the initial value for the random number of atomic variable position, although from the total amount of day, is responsible for the equilibrium pressure must be balanced, But in a short period of time, it’s very likely that the pressure will all go to a few instances, causing them to collapse and fuse, and then all go to a few other instances, which will collapse and fuse, and so on.
Then, we deployed with a K8S deployment, where there might be many microservice pods running on the same virtual machine. In some cases, multiple pods of the same microservice may run on the same VM Node, as can be seen from the IP segment of the POD: For example, a microservice has the following seven instances: 10.238.13.12:8181,10.238. 13.24:8181,10.238. 15.12:8181,10.238. 17.12:8181,10.238. 20.220:8181,10.238. 21.31:8181,10.238. 21.1 21:8181, then 10.238.13.12:8181 and 10.238.13.24:8181 are likely to be on the same Node, 10.238.21.31:8181 and 10.238.21.121:8181 are likely to be on the same Node. We need to retry the instance on a Node that is different from the one we tried before, because if one of the instances on the same Node has a problem or is under too much pressure, the others will also have a problem or is under too much pressure.
Finally, if calling an instance consistently fails, that instance needs to take precedence over other normal instances. Released this to reduce quickly refresh (suddenly started many stopped several old instance after instance, number of instances is greater than retries configuration) to the user, and a usable area sudden abnormal cause multiple instances offline impact on users, as well as business pressure has been in the past, the pressure decreases after, need to turn off the instance is no longer needed, The impact on users when a large number of instances are migrated is significant.
The optimization scheme for the above problems
We propose an optimized solution to the above three problems:
- For each request, record:
- Which instances have been invoked by this request -> the instance cache that the request invoked
- Instance called, how many requests are currently being processed -> Number of requests the instance runs
- Instance called, recent request error rate -> instance request error rate
- Shuffling the list of instances randomly prevents requests from always being sent to the same instance when all three metrics are the same.
- In order that the current request has not been called before -> The lower the error rate, the higher the order -> The smaller the number of instances running requests, the higher the order
- Take the first instance in the sorted list as the load balancing instance
The following code comes from: github.com/JoJoTec/spr…
We use dependencies:
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
Copy the code
Caching classes for logging instance data:
@Log4j2 public class ServiceInstanceMetrics { private static final String CALLING = "-Calling"; private static final String FAILED = "-Failed"; private MetricRegistry metricRegistry; ServiceInstanceMetrics() { } public ServiceInstanceMetrics(MetricRegistry metricRegistry) { this.metricRegistry = metricRegistry; } / records invocation instance * * * * @ param serviceInstance * / public void recordServiceInstanceCall (serviceInstance serviceInstance) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); metricRegistry.counter(key + CALLING).inc(); } /** * Record the end of the instance * @param serviceInstance * @param isSuccess */ public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); metricRegistry.counter(key + CALLING).dec(); if (! Metricregistry.meter (key + FAILED).mark(); @param serviceInstance @return */ public long getCalling(serviceInstance serviceInstance) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); long count = metricRegistry.counter(key + CALLING).getCount(); log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count); return count; } /** * Get the number of call failures in the last minute. @param serviceInstance @return */ public double getFailedInRecentOneMin(serviceInstance serviceInstance) {public double getFailedInRecentOneMin(serviceInstance serviceInstance) { String key = serviceInstance.getHost() + ":" + serviceInstance.getPort(); double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate(); log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate); return rate; }}Copy the code
Load balancing core code:
private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder() .expireAfterAccess(3, TimeUnit.MINUTES) .build(k -> Sets.newConcurrentHashSet()); private final String serviceId; private final Tracer tracer; private final ServiceInstanceMetrics serviceInstanceMetrics; // Each time you retry, @override public Mono<Response<ServiceInstance>> Choose (Request Request) {Span Span = tracer.currentSpan(); Return serviceInstanceListSupplier. The get (). Next (). The map (serviceInstances - > {/ / keep the span and calls to choose span as a try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) { return getInstanceResponse(serviceInstances); }}); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) { if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); TraceId Span currentSpan = tracer.currentSPAN (); if (currentSpan == null) { currentSpan = tracer.newTrace(); } long l = currentSpan.context().traceId(); return getInstanceResponseByRoundRobin(l, serviceInstances); } @VisibleForTesting public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {// First randomly shuffles the order of instances in the List collections.shuffle (serviceInstances); // If the comparator calls multiple times, And parameters may change during sorting (the request statistics for the instance are changing concurrently) Map<ServiceInstance, Integer> used = maps.newhashmap (); Map<ServiceInstance, Long> callings = Maps.newHashMap(); Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap(); ServiceInstances = serviceInstances.stream(). Sorted (Comparator) <ServiceInstance>comparingInt(ServiceInstance -> {return used.computeIfAbsent(ServiceInstance, k -> { return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> { return serviceInstance.getHost().contains(prefix); })? 1:0; }); }). / / the current error rate at least thenComparingDouble (serviceInstance - > {return failedInRecentOneMin.com puteIfAbsent (serviceInstance, k -> { double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance); Return ((int) (value * 100)) / 100.0; return ((int) (value * 100)) / 100.0; }); }) // thenComparingLong(serviceInstance -> {return callings.computeIfAbsent(serviceInstance, k -> serviceInstanceMetrics.getCalling(serviceInstance) ); }) ).collect(Collectors.toList()); if (serviceInstances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } ServiceInstance serviceInstance = serviceInstances.get(0); Calledipprefixes.get (traceId).add(serviceinstance.gethost ().substring(0, serviceInstance.getHost().lastIndexOf("."))); Positioncache.get (traceId).getAndIncrement(); return new DefaultResponse(serviceInstance); }Copy the code
The cache that records when instance data is updated is in FeignClient’s glue retry, disconnect, and thread isolation code, which we’ll see in the next section.
Some Q&A about the project design within the group
1. Why not use a cache shared by all microservices to hold call data to make it more accurate?
Options for shared caches include putting these data records into an in-memory grid like Redis or Apache Ignite. But there are two problems:
- If data records are placed in additional storage such as Redis, all load balancing cannot be performed if Redis is not available. If you add Apache Ignite, if the corresponding node goes offline, then the corresponding load balancing cannot be performed. None of this is acceptable.
- Assuming microservice A needs to call microservice B, it may be that one instance of A has A problem calling one instance of B, but other instances of A have no problem calling this instance of B, for example, when one availability zone is congested with another. If the same cache Key is used to record the data of all instances of A calling instance of B, it is obviously inaccurate.
Each microservice uses a local cache to record its own calls to other instances, which in our view is not only easier to implement, but also more accurate.
2. Use EMA instead of request window to calculate the latest error rate
For example, if we calculate the error rate in the last minute, we will cache the requests in the last minute. When reading, we will add the cached request data together and take the average. However, this method can take up a lot of memory to cache requests when they surge. At the same time, as the number of cache requests increases, more CPU will be consumed to calculate the error rate. It’s not worth it.
EMA is a sliding average that is commonly used in a variety of performance monitoring statistics scenarios, such as dynamic calculation of TLAB size in the JVM, scaling of G1 GC Region size, and many other areas where the JVM needs to dynamically produce appropriate values. Instead of caching requests, he simply multiplies the latest value by a ratio plus the old value by (1 – this ratio), which is usually higher than 0.5, indicating that EMA is more relevant to the current latest value.
But there is another problem with EMA. We will see that as the program runs the number of decimal places becomes very large and we will see values like the following: 0.00000000123, 0.120000001, 0.120000003, in order to ignore the effect of too fine a difference (which is also due to a long time ago error request), we only keep two decimal digits for sorting.
Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers