This is the 9th day of my participation in the August More Text Challenge. For details, see:August is more challenging
preface
The Ribbon is an open source project of Netflix, which is now included in SpringCloud. It is a client load balancer based on HTTP and TCP. When we use the Ribbon with Eureka, The Ribbon retrieves the server list from the Eureka registry and performs load balancing through polling. The client load balancer uses the heartbeat mechanism to maintain the server list. This process works with the service registry.
What is load balancing?
Load balancing is one of the important means for us to deal with high concurrency, relieve network pressure and expand server capacity. However, in general, what we call load balancing usually refers to server load balancing. There are two kinds of load balancing, and one is client load balancing.
Ribbon
与 Nginx
The difference between?
The Ribbon is the client load balancer. Nginx is the server load balancer.
The client load refers to the client has a list of service instances to call. For example, Eureka/NACOS stores information about each service instance. The Ribbon, which is integrated with eureka/ NACOS, selects an instance load from a known list of services based on a certain policy. That is, load balancing is performed on clients.
Server load refers to that the client does not know which server instance to call. After sending a request, the client selects one of multiple servers to access the request based on the load balancing algorithm of the server. That is, the client allocates the load balancing algorithm on the server.
The difference between client load balancing and server load balancing is actually the location where the service list is stored. In client load balancing, all clients have a list of server addresses to access.
Ribbon Load Policy
The diagram of seven load balancing policies is as follows:
Load balancing interface com.net flix. Loadbalancer. IRule, by an abstract class AbstractLoadBalancerRule IRule interface, the concrete implementation strategy is an abstract class.
Polling policy –RoundRobinRule
Let’s take a look at how the source code for RoundRobinRule class is implemented.
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
// Used to count the number of attempts by the load balancer to obtain an available server
int count = 0;
// A total of 10 attempts are made. If more than 10 attempts are made, the load fails
while (server == null && count++ < 10) {
// Get all reachable servers
List<Server> reachableServers = lb.getReachableServers();
// Get all servers
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// Spin-lock calculates the next load of the server
int nextServerIndex = incrementAndGetModulo(serverCount);
// Fetch the next load of the server
server = allServers.get(nextServerIndex);
// If this server is not available, the current thread relinquizes the CPU and sets it to the ready state
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
// Get load server will be tried 10 times, more than 10 warnings
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
// Modulo and update the server for the next load using the CAS mechanism
private int incrementAndGetModulo(int modulo) {
for (;;) {
// Get the atomic attribute value
int current = nextServerCyclicCounter.get();
// Take modulus
int next = (current + 1) % modulo;
// The CAS mechanism updates the identifying server cycle counter
if (nextServerCyclicCounter.compareAndSet(current, next))
returnnext; }}}Copy the code
Polling calculates the index of the loaded machine through modulus calculation, and takes out the list of all servers as the loaded server according to the index. The AtomicInteger + CAS mechanism is used to record the server identity of a load, ensuring thread safety.
Random strategy –RandomRule
The random policy refers to the random selection of server instances for load and the use of ThreadLocalRandom to obtain random numbers to ensure thread safety.
public class RandomRule extends AbstractLoadBalancerRule {
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
// Obtain the reachable server and all servers
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
// Select a random number
int index = chooseRandomInt(serverCount);
server = upList.get(index);
if (server == null) {
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
server = null;
Thread.yield();
}
return server;
}
// Select a random number
protected int chooseRandomInt(int serverCount) {
returnThreadLocalRandom.current().nextInt(serverCount); }}Copy the code
Retry Policy –RetryRule
The service instance is obtained based on the polling load policy. If the service instance fails to be obtained, the service instance is retried within the specified time (500ms by default), and the polling policy is repeatedly invoked to obtain the instance.
Using InterruptTask, a Timer daemon thread is started to delay the execution of a specified task. It invokes the polling policy repeatedly to obtain server information within the retry time range. If no server information is obtained after the specified retry time, null is returned
public class RetryRule extends AbstractLoadBalancerRule {
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
Server answer = null;
// Call the polling policy
answer = subRule.choose(key);
/ / if the polling strategy no access to the server | | server inactive && at a specified maximum retry time
if (((answer == null) | | (! answer.isAlive())) && (System.currentTimeMillis() < deadline)) {// Start the daemon thread to monitor the remaining specified retry time
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
// Within the specified retry time range, if the current thread is not interrupted, the polling policy is invoked circulatively
while(! Thread.interrupted()) { answer = subRule.choose(key);if (((answer == null) | | (! answer.isAlive())) && (System.currentTimeMillis() < deadline)) {/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) | | (! answer.isAlive())) {return null;
} else {
returnanswer; }}}Copy the code
Weighted response time –WeightedResponseTimeRule
The WeightedResponseTimeRule class inherits the polling policy class RandomRule
During initialization, a timer is started and weights are assigned every 30 seconds based on the service response time. The longer the response time is, the lower the weights are, and the lower the probability of being selected. The shorter the response time, the higher the weight, and the higher the probability that the instance is selected. After the weights are obtained, the random weights are generated to hit the first service instance whose weights are greater than the random weights.
public class WeightedResponseTimeRule extends RoundRobinRule {
// Count the weight of each service every 30s
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
// Record the accumulated weight
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
/ / initialization
void initialize(ILoadBalancer lb) {
if(serverWeightTimer ! =null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
// Count the weight of each service
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run(a) {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"+ name); serverWeightTimer.cancel(); }})); }@Override
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
List<Double> currentWeights = accumulatedWeights;
// Determine whether the thread is interrupted
if (Thread.interrupted()) {
return null;
}
// Get the list of servers
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
Currentweights.size () -1 is the sum of the ownership weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// If no server is hit, call the polling policy to get it
if (maxTotalWeight < 0.001 d|| serverCount ! = currentWeights.size()) { server =super.choose(getLoadBalancer(), key);
if(server == null) {
returnserver; }}else {
// Get a random number between 0 and the sum of the ownership weights as the random weights
double randomWeight = random.nextDouble() * maxTotalWeight;
int n = 0;
// Hit the first service instance whose weight is greater than the random weight
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
returnserver; }}/ / inner classes
class ServerWeight {
public void maintainWeights(a) {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if(! serverWeightAssignmentInProgress.compareAndSet(false.true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// Calculate the cumulative average response time for all service instances
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// Record the accumulated weight
Double weightSoFar = 0.0;
// Store weights for all services
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
// Weight per service = Total average response time of all services - Average response time of the current service
// Therefore, the greater the response time of the service, the smaller the weight, and the less likely it is to be selected
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false); }}}Copy the code
For example: Now there are three service instances, and the average response time is:
- A: 100 ms
- B: 200 ms
- C: 300 ms
Then the weights are:
- A: 600-100 = 500
- B: 500+600-200 = 900
- C: 900+600-300 = 1200
If the random number is between 0 and 500, service A is matched; if the random number is between 500 and 900, service B is matched; if the random number is between 900 and 1200, service C is matched; if no service instance is matched, the polling policy result is obtained.
Best available policy –BestAvailableRule
If no load balancer is specified, a service instance is selected using a polling policy.
If the load balancer is specified, the service instances are inspected one by one, the instances with circuit breaker trip status are filtered out, and the instance with the least concurrency is selected from the unfiltered instances. If there is no match, the polling policy selects a service instance.
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
@Override
public Server choose(Object key) {
// The polling policy is invoked when no load balancer is specified
if (loadBalancerStats == null) {
return super.choose(key);
}
// Get the list of all servers
List<Server> serverList = getLoadBalancer().getAllServers();
// Minimum number of concurrent connections
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
// Iterate through the list of servers
for (Server server: serverList) {
// Get server statistics
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
// Filter out instances of circuit breaker trip if the server circuit breaker does not trip
if(! serverStats.isCircuitBreakerTripped(currentTime)) {int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
// Select the instance with the least concurrency
if(concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; }}}// If no, poll to select an instance
if (chosen == null) {
return super.choose(key);
} else {
returnchosen; }}}Copy the code
Availability filtering policy –AvailabilityFilteringRule
The policy is inherited from the abstract policy PredicateBasedRule class.
If the service does not match the filter conditions, the system polls for 10 times. If the service does not match the filter conditions for 10 times, the system polls for an instance.
Filter condition: The circuit breaker is faulty or the number of concurrent requests exceeds the threshold
public class AvailabilityFilteringRule extends PredicateBasedRule {
@Override
public Server choose(Object key) {
int count = 0;
// The polling policy selects an instance
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
// Determine whether the assertion conditions are met
if (predicate.apply(new PredicateKey(server))) {
return server;
}
// If the assertion condition is not met, then poll to select an instance
server = roundRobinRule.choose(key);
}
// If more than 10 times are not satisfied, use the superclass 'PredicateBasedRule' policy
return super.choose(key); }}Copy the code
Take a look at the load policy of the superclass PredicateBasedRule
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
// After filtering according to the conditions, select instances using a polling policy
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null; }}}Copy the code
To see what the assertions are, go to the AvailabilityPredicate class and see what the assertions are
public class AvailabilityPredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return! shouldSkipServer(stats.getSingleServerStat(input.getServer())); }private boolean shouldSkipServer(ServerStats stats) {
// Filter an instance if one of the following two conditions is met
// 1. The circuit breaker is open and faulty
// 2. Concurrent requests of the instance >= threshold
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false; }}Copy the code
Area avoidance strategy –ZoneAvoidanceRule
Inherited from PredicateBasedRule
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule(a) {
super(a);// Two filter conditions
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); }}Copy the code
Two assertion conditions
public class ZoneAvoidancePredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
if(! ENABLED.get()) {return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if(! zoneSnapshot.keySet().contains(serverZone)) {// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
// Obtain the available zone
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if(availableZones ! =null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false; }}}Copy the code
This filter is the filter conditions AvailabilityFilteringRule strategy.
public class AvailabilityPredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return! shouldSkipServer(stats.getSingleServerStat(input.getServer())); }// Filter an instance if one of the following two conditions is met
// 1. The circuit breaker is open and faulty
// 2. Concurrent requests of the instance >= threshold
private boolean shouldSkipServer(ServerStats stats) {
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
return true;
}
return false; }}Copy the code
summary
This article mainly analyzes the seven load balancing strategies of the ribbon from the perspective of source code. If you are interested in MySQL, Spring, etc., please keep paying attention.