preface

When there are multiple service providers in our Dubbo application, how does the service consumer choose which one to invoke? This involves load balancing algorithms.

LoadBalance is the function of “balancing” network requests, or other forms of load, among different machines. Avoid the situation where some servers in the cluster are overloaded and others are idle. Through load balancing, each server can obtain a load suitable for its processing capacity. It can also avoid resource waste and kill two birds with one stone while diverting servers with high load.

Dubbo provides four load balancing implementations:

  • RandomLoadBalance based on weighted random algorithm

  • Leastactive Veload Balance based on the least number of active calls algorithm

  • Based on the hash ConsistentHashLoadBalance consistency

  • RoundRobinLoadBalance based on weighted polling algorithm

A, LoadBalance

In Dubbo, all load balancing implementation classes inherit from AbstractLoadBalance, which implements the LoadBalance interface.

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
    /**
     * select one invoker in list.
     *
     * @param invokers   invokers.
     * @param url        refer url
     * @param invocation invocation.
     * @return selected invoker.
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
Copy the code

As you can see, the SPI annotation of the interface specifies the default implementation RandomLoadBalance, but before we rush, let’s look at the logic of the abstract class.

1. Select a service

Let’s start with the load balancing entry method SELECT, which has relatively simple logic. Verify that the service provider is null; If there is only one Invoker in the Invokers list, return it directly without load balancing. There are multiple Invokers that call subclass implementations for load balancing.

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
	if (invokers == null || invokers.isEmpty())
		returnnull; // If there is only one service provider, return it directly without load balancingif (invokers.size() == 1)
		return invokers.get(0);
	return doSelect(invokers, url, invocation);
}
Copy the code

2. Get weights

This contains two logic, one is to get the weight value of the configuration, default is 100; The other is to recalculate weights based on how long the service has been running.

protected int getWeight(Invoker<? > < div style = "text-align: center;" The default is 100 int weight = invoker.geturl ().getMethodParameter(Invocation.getMethodName(),"weight", 100);if(weight > 0) {timestamp = invoker.geturl ().getParameter()"remote.timestamp", 0L);
		if(timestamp > 0L) {// Current time - start time = running time int uptime = (int) (system.currentTimemillis () -timestamp); Int Warmup = invoker.geturl ().getParameter(int warmup = invoker.geturl ().getParameter()"warmup", 600000); // If the service running time is less than the warmup time, that is, the service has not started in 10 minutesif(uptime > 0 && uptime < WarmUp) {// Recalculate service weight = calculateWarmupWeight(uptime, warmup, weight); }}}return weight;
}
Copy the code

Get the service weight value in the code above. Then determine whether the service startup time is shorter than the service preheating time, and recalculate the weight. The service preheating time is 10 minutes by default. The general process is as follows:

  • Gets the weight value of the configuration, which defaults to 100
  • Gets the timestamp when the service started
  • Current time – Service start time = Service running duration
  • Obtain the service preheating time. The default value is 10 minutes
  • Determine whether the service running time is less than the preheating time. If the condition is true, recalculate the weight

Recalculation of weight is actually the process of weight reduction.

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
	int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
	return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
Copy the code

The code looks simple, but it’s not easy to understand. We can replace the above code with the following formula: uptime/warmup) * weight, which is the percentage of progress * weight.

Suppose we set the weight to 100 and the warm-up time to 10 minutes. So:

The running time The formula Calculated weight
1 minute 1/10 * 100 10
2 minutes 2/10 * 100 20
5 minutes 5/10 * 100 50
Ten minutes 10/10 * 100 100

It can be seen that the weights are downgraded before the service warm-up time is reached. Why did Dubbo do this?

When the running time of the service is less than the service preheating time, the service can be degraded to prevent the service from being under heavy load at the beginning of startup. Service warming up is an optimization tool, as is JVM warming up. The main purpose is to allow the service to run at “low power” for a period of time after startup, slowly increasing its efficiency to optimum state.

Second, weight random algorithm

RandomLoadBalance is a concrete implementation of the weighted random algorithm and the default implementation of the load balancing algorithm in Dubbo. Here we need to first partition the server according to the weight, for example:

Suppose there are three servers: [A, B, and C], their corresponding weights are [1, 3, and 6], and their total weight is 10

Then, we can get:

interval Owning server
0-1 A
1-4 B
4-10 C

The rest is simple, we get the totalWeight, and then generate a random number between [0-totalWeight], calculate which range the random number will fall in.

protected <T> Invoker<T> doSelect(List<Invoker<T>> Invokers, URL, URL, Invocation) {int length = invokers.size(); // totalWeight int totalWeight = 0; // have the sameWeight Boolean sameWeight =true; // Loop the list of services, calculate the total weight and check whether each service weight is the samefor(int i = 0; i < length; Int weight = invocation (invokers.get(I), Invocation); // totalWeight += weight; // Check whether the service weights are the sameif(sameWeight && i > 0 && weight ! = getWeight(invokers.get(i - 1), invocation)) { sameWeight =false; }}if(totalWeight > 0 && ! SameWeight) {// Get a random number between [0-totalWeight] int offset = random. NextInt (totalWeight); // Calculate the random number in which interval, return the corresponding invokerfor (int i = 0; i < length; i++) {
			offset -= getWeight(invokers.get(i), invocation);
			if (offset < 0) {
				returninvokers.get(i); }}} // If the weight is the same, random returnreturn invokers.get(random.nextInt(length));
}
Copy the code

Let’s summarize the flow of the above code with the above example:

  1. Number of service providers obtained = 3
  2. Add them up and calculate the total weight is equal to 10
  3. Check whether service weights are equal. No. 1, 3, 6
  4. Get a direct random number from 0 to 10, assuming offset = 6
  5. The first loop, 6-=1>0, the condition is not true, offset = 5
  6. The second loop, 5-=3>0, the condition is not true, offset = 2
  7. The third loop, 2-=6<0, the condition is true, return to the third group of servers

Finally, if the weights are all the same, return a service Invoker directly at random.

Minimum active number algorithm

The minimum active load balancing algorithm corresponds to LeastActive VeloadBalance. The smaller the number of active invocations is, the more efficient the service provider is and the more requests it can process per unit time. In this case, the service provider should be allocated the request preferentially.

Dubbo assigns each service provider Invoker an active, representing the size of the active number. The increment operation is performed before the call and the decrement operation is performed after the call. Some services are fast and some are slow. The faster the number of active, the smaller the priority allocation.

protected <T> Invoker<T> doSelect(List<Invoker<T>> Invokers, URL, URL, Invocation) {int length = invokers.size(); // Default minimum active value int leastActive = -1; Int leastCount = 0; Invoker index int[] leastIndexs = new int[length]; // totalWeight int totalWeight = 0; // The first Invoker weight is used to compare whether the Invoker direct weight is the same int firstWeight = 0; boolean sameWeight =true; // Loop to compare the Invoker active numberfor(int i = 0; i < length; Invoker<T> Invoker = invokers.get(I); Int Active = rpcStatus.getStatus (Invoker.geturl (), Invocation.getMethodName()). Int weight = Invoker.geturl ().methodParameter (Invocation.getMethodName(),"weight", 100); // Compare to find a smaller active number, resetif(leastActive = = 1 | | active < leastActive) {/ / update the minimum number of active leastActive = active; LeastCount = 1; LeastIndexs [0] = I; totalWeight = weight; firstWeight = weight; sameWeight =true; // If the current Invoker active count is equal to the minimum count}else if (active == leastActive) { 
			leastIndexs[leastCount++] = i;
			totalWeight += weight;
			if(sameWeight && i > 0 && weight ! = firstWeight) { sameWeight =false; }}} // If only one Invoker has the minimum number of invokers, just return itif (leastCount == 1) {
		returninvokers.get(leastIndexs[0]); } // Multiple invokers have the same minimum active number, but different weightsif(! sameWeight && totalWeight > 0) { int offsetWeight = random.nextInt(totalWeight);for (int i = 0; i < leastCount; i++) {
			int leastIndex = leastIndexs[i];
			offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
			if (offsetWeight <= 0)
				returninvokers.get(leastIndex); }} // Get a random return from leastIndexsreturn invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
Copy the code

The above code is divided into two parts. The first is to determine the minimum number of invokers by comparison. The second is to determine Invoker according to the weight. Let’s summarize it step by step:

  • Define variables – minimum active size, quantity, array, weight value

  • Loop through the Invokers array to get the current Invoker active number size and weight

  • Compare the current number of Invoker active, is smaller than the previous one; If the condition is true, reset the minimum active number; If they are equal, the weights are added up and the weights are judged to be the same

  • The comparison is complete, and if there is only one minimum active number, it returns Invoker directly

  • If multiple invokers have the same active number but different weights; Just go to the logic of weights

  • If the above two conditions are not true, we get a random number within the minimum number of active numbers and return Invoker

Seeing this, have you thought of another question, that is, where does the active number increase and decrease?

This brings us to Dubbo’s filters, which refer to the ActiveLimitFilter class. In this class, there is code like this:

Rpcstatus. beginCount(url, methodName); // Trigger active increment rpcStatus. beginCount(url, methodName); Result result = invoker.invoke(invocation); Rpcstatus.endcount (url, methodName, system.currentTimemillis () -begin,true);
return result;
Copy the code



4. Hash consistency algorithm

The consistent hash algorithm was proposed by Karger of MIT and his collaborators in 1997. The algorithm was originally proposed for load balancing of large-scale cache systems.

Its principle is roughly as follows:

Construct an integer ring of length 232 (a consistent Hash ring) and place the server node over the ring based on the Hash value of the node name (distributed from 0 to 232-1). Finally, the Hash value is calculated based on the Key value of the data, and the server node whose Hash value is closest to the Key value is searched clockwise on the Hash ring to complete the key-to-server mapping search.

If you do not know the consistent Hash algorithm, you need to supplement relevant knowledge by yourself.

In Dubbo, virtual nodes are introduced to solve the data skew problem. The illustration is as follows:

Here the nodes of the same color belong to the same service provider, such as Invoker1-1, invoker1-2… Invoker1-160. That is, each Invoker creates 160 virtual nodes, and the total length of the Hash ring is 160 x the number of nodes.

Let’s see ConsistentHashLoadBalance doSelect implementation.

protected <T> Invoker<T> doSelect(List<Invoker<T>> Invokers, URL, URL, Invocation) { com.viewscenes.netsupervisor.service.InfoUserService.sayHello String key = invokers.get(0).getUrl().getServiceKey() +"."+ invocation.getMethodName(); // For the current invokershashValue int identityHashCode = system. identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); // If ConsistentHashSelector is empty or new invokershashDifferent values of Code // indicate that the list of service providers may change. Therefore, you need to obtain a ConsistentHashSelectorif(selector == null || selector.identityHashCode ! = identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } // Select Invokerreturn selector.select(invocation);
}
Copy the code

The above code is mainly used to get ConsistentHashSelector and then call its method to select Invoker to return. Note also that if the list of service providers changes, their HashCode values will be different twice, in which case the ConsistentHashSelector object will be recreated. Now the question becomes, how is ConsistentHashSelector created?

1. Create ConsistentHashSelector

This class has several properties, so let’s look at them first.

Private static final class ConsistentHashSelector<T> {// Use TreeMap to store Invoker virtual node private Final TreeMap<Long, Invoker<T>> virtualInvokers; Private final int replicaNumber; private final int replicaNumber; // Hash value of service provider list private Final int identityHashCode; Private final int[] argumentIndex; }Copy the code

Look at its constructor, basically create a virtual node Invoker, into the virtualInvokers.

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, Int identityHashCode) {// Initialize TreeMap this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); // The Hash value of the invokers list this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); This. replicaNumber = url.getmethodParameter (methodName,"hash.nodes", 160); // The first argument is taken by defaulthashValue String[] index = Constants.COM ma_split_pattern.split (url.getmethodParameter (methodName,"hash.arguments"."0"));
	argumentIndex = new int[index.length];
	for(int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // Loop to create the virtual node Invokerfor (Invoker<T> invoker : invokers) {
		String address = invoker.getUrl().getAddress();
		for (int i = 0; i < replicaNumber / 4; i++) {
			byte[] digest = md5(address + i);
			for (int h = 0; h < 4; h++) {
				long m = hash(digest, h); virtualInvokers.put(m, invoker); }}}}Copy the code

The focus of the above code is to create the virtual node Invoker.

First, obtain the address of the communication server, such as 192.168.1.1:20880. Then, the address + I MD5 algorithm is used to get an array. Then, the partial bytes of the array are hashed four times to get four different positive longs. Finally, the mapping between hash and Invoker is stored in TreeMap.

At this point, if we have three service providers, let’s figure out how many virtual nodes there are. What ho! No calculators, please do it in your head. That’s right, 480. Their mapping is as follows:

2, choose

Once ConsistentHashSelector is created, it’s time to call its method to select an Invoker.

public Invoker<T> select(Invocation invocation) {
	String key = toKey(invocation.getArguments());
	byte[] digest = md5(key);
	return selectForKey(hash(digest, 0));
}
Copy the code

The above code is very simple, so let’s look at it in two parts.

2.1. Conversion parameters

Get the argument list and convert it to a string using the toKey method. This may seem simple, but there is another logic behind it. It just takes the first argument, so let’s look at the toKey method.

private String toKey(Object[] args) {
	StringBuilder buf = new StringBuilder();
	for (int i : argumentIndex) {
		if(i >= 0 && i < args.length) { buf.append(args[i]); }}return buf.toString();
}
Copy the code

After obtaining the key value, MD5 arithmetic is performed on the string key, and then a positive integer of long is obtained using hash. Basically, this step converts the first parameter value in the argument list to a positive integer of type long. Then, the same parameter values will yield the same hash value, so the load balancing logic here will be affected only by the parameter value, and requests with the same parameter value will be assigned to the same service provider.

2.2,

Once you’ve computed the Hash value, things are easy. The principle of the consistent Hash algorithm is to search clockwise on the Hash ring for the server node closest to the Hash value of the Key. In Dubbo, virtualInvokers TreeMap returns the part of the data whose key is greater than or equal to the Hash value and takes the first one.

private Invoker<T> selectForKey(long hash) {
	Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash.true).firstEntry();
	if (entry == null) {
		entry = virtualInvokers.firstEntry();
	}
	return entry.getValue();
}  
Copy the code

5. Weighted polling algorithm

Speaking of polling, we all know that. Just one by one, in order, impartially, impartially. If you’re purchasing servers with roughly the same performance, polling is perfect, simple and efficient.

What about weighted polling?

If our server performance is different, simple polling is not easy to do. Small body server said can not carry so much pressure, request to reduce the right.

Suppose we have servers [A, B, and C] with weights of [1, 2, and 3]. In the face of six requests, their load balancing results are as follows: [A, B, C, B, C, C].

The algorithm’s corresponding class is RoundRobinLoadBalance, and before we start, let’s look at its two properties.

sequences

It is a number that records the call number of the service, and it is an instance of AtomicPositiveInteger. Gets from the fully qualified class name + method name, or creates if empty.

AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
	sequences.putIfAbsent(key, new AtomicPositiveInteger());
	sequence = sequences.get(key);
}
Copy the code

It then increments to get the current number before each invocation of the service. int currentSequence = sequence.getAndIncrement();

IntegerWrapper

This is simply a wrapper class of type int, mainly a decrement method.

private static final class IntegerWrapper {
	private int value;

	public IntegerWrapper(int value) {
		this.value = value;
	}
	public int getValue() {
		return value;
	}
	public void setValue(int value) {
		this.value = value;
	}
	public void decrement() { this.value--; }}Copy the code

Then let’s look at the doSelect method, which we’ll break down for the sake of parsing.

1. Get weights

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation) {// Fully qualified + Method name String key = Invokers.get (0).geturl ().getServiceKey() +"."+ invocation.getMethodName(); Int length = invokers.size(); Int maxWeight = 0; // minimum weight int minWeight = integer.max_value; final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; // Loop is mainly used to find the maximum and minimum weight, calculate the total weight, etcfor (int i = 0; i < length; i++) {
		int weight = getWeight(invokers.get(i), invocation);
		maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
		minWeight = Math.min(minWeight, weight); // Choose the minimum weight
		ifInvokertoweightmap.put (invokers.get(I), invokertoweightmap.put (invokers.get(I), new IntegerWrapper(weight)); weightSum += weight; }}}Copy the code

The above code is mainly to obtain the weight size of Invoker and calculate the total weight. The key is to put the Invoker object and its corresponding weight size IntegerWrapper into the invokerToWeightMap.

2. Get the service invocation number

The sequence is incremented before each call to obtain the service call number. Note that the key to obtain the sequence is the fully qualified class name + method name.

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation) {// Fully qualified + Method name String key = Invokers.get (0).geturl ().getServiceKey() +"."+ invocation.getMethodName(); / /... AtomicPositiveInteger sequence = sequences.get(key);if (sequence == null) {
		sequences.putIfAbsent(key, new AtomicPositiveInteger());
		sequence = sequences.get(key);
	}
	int currentSequence = sequence.getAndIncrement();
}
Copy the code

3, the weight

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //...... Int currentSequence = sequence.getAndincrement (); int currentSequence = sequence.getAndIncrement();if(maxWeight > 0 &&minweight < maxWeight) {int mod = currentSequence % weightSum; // Traverses the maximum weight size timesfor(int i = 0; i < maxWeight; I++) {// iterate over the invokerToWeightMapfor(Map.Entry<Invoker<T>, IntegerWrapper> each : InvokerToWeightMap. EntrySet ()) {/ / current Invoker final Invoker < T > k = each. The getKey (); Final IntegerWrapper v = each. GetValue (); // Returns Invoker when mod is 0 and the current weight is greater than 0if (mod == 0 && v.getValue() > 0) {
					returnk; } // Take the remainder of the sum of weights if it is not equal to 0 and the current weight is greater than 0if (v.getValue() > 0) {
					v.decrement();
					mod--;
				}
			}
		}
	}
}
Copy the code

The above code is the process of getting the Invoker based on the weight polling, which is a little hard to understand. But if we Debug it, we can understand it better. Let’s simulate the operation process with the above example. At this time, there are servers [A, B and C], the weights are [1, 2 and 3] respectively, the total weight is 6, and the maximum weight is 3.

If mod = 0, the server is directly returned to server A

Mod = 1: The value is reduced by one time. In this case, server B is returned

Mod = 2: the condition is satisfied only after the value is reduced twice. In this case, return to server C

Mod = 3: the server weight is [0, 1, 2] after decrement, and then return to server B

Mod = 4: the server weight is [0, 0, 1] after decrement, return to server C

Mod = 5: only server C has weights, return C.

The result is [A, B, C, B, C, C].

When it is called for the seventh time, the call number is 6 and the total weight is 6. Mod is 0 and you start again.

4, polling

In the end, if everyone has the same weight, there’s nothing left to say, just polling.

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //..... / / pollingreturn invokers.get(currentSequence % length);
}
Copy the code