The core function of the Ribbon is to load balance requests. The basic principle is shown in the following figure. When the Ribbon is integrated with the client, the Ribbon performs load balancing calculations based on the configured address list of service providers. After obtaining a target address, the Ribbon sends a request.
Next, let’s analyze the principle of the Ribbon from two aspects
- How do @loadBalanced annotations make normal
RestTemplate
Be capable of load balancing - OpenFeign integrated Ribbon implementation principle
LoadBalancer Annotation parsing process analysis
When we use the RestTemplate, we add an @loadBalance annotation to give the RestTemplate client-side load balancing capability on request.
@Bean
@LoadBalanced
RestTemplate restTemplate(a) {
return new RestTemplate();
}
Copy the code
Then, we open the @loadBalanced annotation and see that it simply declares an @qualifier annotation.
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
Copy the code
@qualifier Role of the annotation
When we use annotations to inject a Bean, we use @AutoWired. And you should know that @AutoWired can inject a List or a Map. To give you an example (in a SpringBoot application)
Define a TestClass
@AllArgsConstructor
@Data
public class TestClass {
private String name;
}
Copy the code
Declare a configuration class and inject TestClass
@Configuration
public class TestConfig {
@Bean("testClass1")
TestClass testClass(a){
return new TestClass("testClass1");
}
@Bean("testClass2")
TestClass testClass2(a){
return new TestClass("testClass2"); }}Copy the code
Define a Controller for testing, and notice that we’re using @autoWired to inject a List
@RestController
public class TestController {
@Autowired(required = false)
List<TestClass> testClasses= Collections.emptyList();
@GetMapping("/test")
public Object test(a){
returntestClasses; }}Copy the code
At this time to visit: http://localhost:8080/test, the result is
[
{
name: "testClass1"
},
{
name: "testClass2"}]Copy the code
Modify TestConfig and TestController
@Configuration
public class TestConfig {
@Bean("testClass1")
@Qualifier
TestClass testClass(a){
return new TestClass("testClass1");
}
@Bean("testClass2")
TestClass testClass2(a){
return new TestClass("testClass2"); }}Copy the code
@RestController
public class TestController {
@Autowired(required = false)
@Qualifier
List<TestClass> testClasses= Collections.emptyList();
@GetMapping("/test")
public Object test(a){
returntestClasses; }}Copy the code
Once again to visit: http://localhost:8080/test, the result is
[
{
name: "testClass1"}]Copy the code
LoadBalancer Filter and block annotations
Now that you know what the @Qualifier annotation is for, it’s easy to go back to the @loadbalancer annotation.
LoadBalancer (@loadBalancer); LoadBalancer (@loadBalancer); LoadBalancer (@loadBalancer);
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
}
Copy the code
As you can see from this code, in this configuration LoadBalancerAutoConfiguration class, will use the same way, the configuration of the @ LoadBalanced annotation RestTemplate into restTemplates collection.
After got the RestTemplate in LoadBalancerInterceptorConfig configuration class, will be aimed at these RestTemplate to intercept, implementation code is as follows:
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
/ / to omit...
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Configuration(proxyBeanMethods = false)
@Conditional(RetryMissingOrDisabledCondition.class)
static class LoadBalancerInterceptorConfig {
// Load an instance of LoadBalancerInterceptor into the IOC container.
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
// It iterates through all restTemplates annotated with @loadBalanced, adding a LoadBalancerInterceptor to the original interceptor
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = newArrayList<>(restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); }; }}/ / to omit...
}
Copy the code
LoadBalancerInterceptor
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
finalURI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName ! =null."Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
Copy the code
RestTemplate calls the procedure
In our program, we use the following code to initiate a remote request
restTemplate.getForObject(url,String.class);
Copy the code
Its entire invocation process is as follows.
RestTemplate.getForObject
—–> AbstractClientHttpRequest.execute()
—–>AbstractBufferingClientHttpRequest.executeInternal()
—–> InterceptingClientHttpRequest.executeInternal()
—–> InterceptingClientHttpRequest.execute()
InterceptingClientHttpRequest. The execute () method code is as follows.
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) { // Traverses all interceptors, one by one, through interceptors.
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else{ HttpMethod method = request.getMethod(); Assert.state(method ! =null."No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceofStreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); }else{ StreamUtils.copy(body, delegate.getBody()); }}returndelegate.execute(); }}Copy the code
LoadBalancerInterceptor
LoadBalancerInterceptor is an interceptor. When a RestTemplate object modified by the @loadBalanced annotation makes an HTTP request, it is intercepted by the LoadBalancerInterceptor’s Intercept method.
In this method, we can get the service name directly from the getHost method. (Since we call the RestTemplate service using the service name, not the domain name, we can get the service name directly from the getHost method and call the execute method.)
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
finalURI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName ! =null."Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
Copy the code
LoadBalancerClient is actually an interface. If we look at its class diagram, it has a unique implementation class: RibbonLoadBalancerClient.
RibbonLoadBalancerClient.execute
The code of the RibbonLoadBalancerClient class is quite long. Let’s focus on its core method execute
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
Copy the code
The implementation logic of the above code is as follows:
- Obtain an ILoadBalancer instance from serviceId: ZoneAwareLoadBalancer
- Call the getServer method to get a service instance
- Check whether the Server value is null. The Server here is actually a traditional service node. This object stores some metadata of the service node, such as host and port
getServer
GetServer is used to obtain a specific service node, which is implemented as follows
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
returnloadBalancer.chooseServer(hint ! =null ? hint : "default");
}
Copy the code
Through code as you can see, getServer actual call the IloadBalancer. ChooseServer this method, this is a load balancer IloadBalancer interface.
public interface ILoadBalancer {
//addServers adds a service instance to the list of instances maintained in the load balancer
public void addServers(List<Server> newServers);
ChooseServer indicates that a specific service instance is selected from the load balancing server by some policy
public Server chooseServer(Object key);
//markServerDown is used to inform and mark the load balancer that a specific instance has stopped serving, otherwise the load balancer will assume that the service instance is working until the next time the load balancer gets the list of service instances
public void markServerDown(Server server);
//getReachableServers obtains the list of service instances that are currently working properly
public List<Server> getReachableServers(a);
//getAllServers gets a list of all service instances, both normal and stopped
public List<Server> getAllServers(a);
}
Copy the code
The ILoadBalancer class diagram is as follows:
From the point of the whole class diagram, BaseLoadBalancer class implements the basis of load balancing, and DynamicServerListLoadBalancer and ZoneAwareLoadBalancer is on the basis of load balancing strategy made some function extension.
- AbstractLoadBalancer implements the ILoadBalancer interface, It defines an enumeration class for a group of services /chooseServer (to pick a service instance) /getServerList (to get all service instances in a group) /getLoadBalancerStats to get a LoadBalancerStats object, This object holds state information for each service.
- BaseLoadBalancer, which implements basic functions as a load balancer, such as service list maintenance, service survival status monitoring, load balancing algorithm selection Server, etc. However, it only performs basic functions and cannot be implemented in some complex scenarios, such as dynamic service lists, Server filtering, and Zone awareness (the invocation between services should be within the same Zone as possible to reduce latency).
- DynamicServerListLoadBalancer BaseLoadbalancer a subclass, it is the basis of load balancing provides extension, it can be seen from the name, it provides the dynamic characteristics of the service list
- ZoneAwareLoadBalancer it is on the basis of DynamicServerListLoadBalancer, added in the form of a Zone to configure multiple LoadBalancer function.
That in getServer approach, loadBalancer. ChooseServer concrete implementation class is which one? We find RibbonClientConfiguration this class
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList
serverList, ServerListFilter
serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater)
{
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
Copy the code
From the above declaration, it is found that if there is no custom ILoadBalancer, a ZoneAwareLoadBalancer is returned directly
ZoneAwareLoadBalancer
A Zone is a geographical area. Generally, large-scale Internet companies deploy across regions. This provides the nearest access node for users in different regions to reduce access delay.
ZoneAwareLoadBalancer is A zone-aware load balancer that senses zones and ensures that the load balancing policies in each Zone are isolated. ZoneAwareLoadBalancer does not guarantee that the requests from ZoneA will be sent to the Server corresponding to ZoneA. Really realize this demand is ZonePreferenceServerListFilter/ZoneAffinityServerListFilter.
The core functionality of ZoneAwareLoadBalancer is
- If zone awareness is enabled and the number of zones is greater than 1, continue the zone selection logic
- According to ZoneAvoidanceRule. GetAvailableZones () method to get the available area are (can T get rid of the area are completely unusable, as well as available but an area of the highest load)
- From the available area in the zone, through ZoneAvoidanceRule randomChooseZone randomly choose a zone out (with the random weighting rule: the number of zones inside the Server who the most, the greater the probability of the selected)
- Choose the Rule of all servers in the selected zone
@Override
public Server chooseServer(Object key) {
//ENABLED: indicates whether to select Server with region-conscious choose. Default is true.
// If a zone is disabled, or if there is only one zone, the parent class uses the polling algorithm by default
if(! ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <=1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold".0.2 d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage".0.99999 d);
}
// Calculate the available area based on the relevant threshold
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if(availableZones ! =null && availableZones.size() < zoneSnapshot.keySet().size()) {
// Select a zone randomly from the available zones. The more server nodes in the zone, the more likely it is to be selected
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if(zone ! =null) {
// Obtain the LB of the zone based on the zone, and select a server based on the load balancing algorithm of the zoneBaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); }}}catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if(server ! =null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key); }}Copy the code
BaseLoadBalancer.chooseServer
Suppose we now don’t use more regional deployment, then load strategy performs to BaseLoadBalancer. ChooseServer,
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null; }}}Copy the code
The specified service node is obtained according to the default load balancing algorithm. The default algorithm is RoundBin.
rule.choose
Rule represents the rule of the load balancing algorithm. There are many implementations of IRule.
By default, the realization of the rule for ZoneAvoidanceRule, it is defined in the configuration class RibbonClientConfiguration, code is as follows:
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
// Order is important here, last should be the default, first should be optional
// see
// https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public IRule ribbonRule(IClientConfig config) {
if (this.propertiesFactory.isSet(IRule.class, name)) {
return this.propertiesFactory.get(IRule.class, config, name);
}
ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
rule.initWithNiwsConfig(config);
returnrule; }}Copy the code
. So, in BaseLoadBalancer chooseServer invoke rule. Choose (key); That actually enters the choose method of the ZoneAvoidanceRule
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer(); // Obtain the load balancer
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); // Use this method to get the target service
if (server.isPresent()) {
return server.get();
} else {
return null; }}Copy the code
Select a server to judge the performance and availability of the server
Analysis the main chooseRoundRobinAfterFiltering method.
chooseRoundRobinAfterFiltering
As can be seen from the name of the method, it uses polling to achieve load balancing after filtering the target service cluster through the filtering algorithm.
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
Copy the code
CompositePredicate.getEligibleServers
Use the main filter criteria to filter all instances and return the filtered list,
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
//
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
// Filter in the order that filters are stored in fallbacks (ZoneAvoidancePredicate then AvailabilityPredicate)
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
while(! (result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}
Copy the code
The results of the main filter criteria are filtered * using the secondary filter criteria in turn
- // Both primary and secondary filtering conditions need to be determined
- // As long as one condition is met, the current result is returned for linear polling
- Condition 1: Total number of filtered instances >= Minimum filtered instances (default: 1)
- Second condition: Filter mutual instance ratio > minimum filter percentage (default: 0)
getEligibleServers
The implementation logic is to iterate through the list of servers and call this.apply for validation. Any nodes that pass the validation will be added to the Results list and returned.
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(newPredicateKey(loadBalancerKey, server))) { results.add(server); }}returnresults; }}Copy the code
This. Apply goes into the CompositePredicate. Apply method as follows.
//CompositePredicate.apply
@Override
public boolean apply(@Nullable PredicateKey input) {
return delegate.apply(input);
}
Copy the code
An instance of the delegate is AbstractServerPredicate, and the code looks like this!
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
returnp.apply(input); }}; }Copy the code
That is to say, through AbstractServerPredicate. Apply method to filter, among them, the input said of a particular target server cluster nodes.
Where p is an instance of AndPredicate, and the combination predicate is used here, and the combination predicate is used here, and the predicate is used here.
private static class AndPredicate<T> implements Predicate<T>, Serializable {
private final List<? extends Predicate<? super T>> components;
private static final long serialVersionUID = 0L;
private AndPredicate(List<? extends Predicate<? super T>> components) {
this.components = components;
}
public boolean apply(@Nullable T t) {
for(int i = 0; i < this.components.size(); ++i) { // Walk through multiple predicates, making judgments one by one.
if(! ((Predicate)this.components.get(i)).apply(t)) {
return false; }}return true; }}Copy the code
In the code above, components are made up of two predicate classes
- AvailabilityPredicate, filtering services in fusing state and services with too many concurrent connections.
- ZoneAvoidancePredicate filters out nodes with unused areas.
So, with the Apply method for AndPredicate, you walk through the two predicates one by one.
AvailablilityPredicate
To filter services in fusing state and services with excessive concurrent connections, the code is as follows:
@Override
public boolean apply(@Nullable PredicateKey input) {
LoadBalancerStats stats = getLBStats();
if (stats == null) {
return true;
}
return! shouldSkipServer(stats.getSingleServerStat(input.getServer())); }Copy the code
To determine whether to skip the target node, the logic is as follows.
private boolean shouldSkipServer(ServerStats stats) {
/ / niws loadbalancer. AvailabilityFilteringRule. Whether filterCircuitTripped to true
if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) // Whether the Server is disconnected
|| stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {// Whether the number of unprocessed requests sent to the Server is greater than the maximum number of active connections for the Server instance
return true;
}
return false;
}
Copy the code
How to determine whether the Server is disconnected?
ServerStats source code, here detailed source code we do not paste, say about the mechanism:
The circuit breaker is realized by time judgment, and the last failure time is recorded for each failure. If it fails, it will trigger judgment, whether it is greater than the minimum failure times of disconnection, then it will judge:
Calculate the duration of disconnection: (2^ number of failures) * disconnection time factor, if greater than the maximum disconnection time, the maximum disconnection time. Check whether the current time is longer than the last failure time and short circuit duration. If the time is smaller, the circuit is disconnected. There are three more configurations involved (here you need to replace default with the name of the microservice you are calling) :
- Niws. Loadbalancer. Default connectionFailureCountThreshold, defaults to 3, trigger determine whether open circuit, the minimum number of failure, that is, the default if you fail three times, will determine whether or not to break.
- Niws. Loadbalancer. Default circuitTripTimeoutFactorSeconds, default is 10, breaking time factor,
- Niws. Loadbalancer. Default circuitTripMaxTimeoutSeconds, default is 30, maximum breaking time
ZoneAvoidancePredicate
ZoneAvoidancePredicate filters out nodes for unusable areas like this!
@Override
public boolean apply(@Nullable PredicateKey input) {
if(! ENABLED.get()) {/ / check the niws loadbalancer. ZoneAvoidanceRule. If enabled configuration of familiar to true (the default is true) if there is no open shard to false filtering Is not filtered
return true;
}
//// Obtain the configured partition string The default value is UNKNOWN
String serverZone = input.getServer().getZone();
if (serverZone == null) { // If there is no partition, do not need to filter, just return
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
// Obtain load balancing status information
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
// If the available range is less than or equal to 1, it is returned without filtering
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true;
}
// Create a region snapshot for the current load information. The snapshot data will be used for calculation in the future (to avoid inaccurate calculation due to data changes).
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if(! zoneSnapshot.keySet().contains(serverZone)) {// If the snapshot information does not contain the area where the current server resides, you do not need to determine whether the snapshot information exists.
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
// Get the valid area
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if(availableZones ! =null) { Return true if the valid region contains the current node, false otherwise, false indicates that the region is unavailable and no target node distribution is required.
return availableZones.contains(input.getServer().getZone());
} else {
return false; }}Copy the code
LoadBalancerStats, each time a communication is initiated, the status information is printed on the console as follows!
DynamicServerListLoadBalancer for client goods-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=goods-service,current list of Servers=[localhost:9091, localhost:9081],Load balancer stats=Zone stats: {unknown=[Zone:unknown; Instance count:2; Active connections count: Tripped count: 0; Circuit breaker tripped count: 0; Active Connections per server: 0.0; tripped tripped count: 0; },Server stats: [[Server:localhost:9091; Zone:UNKNOWN; Total Requests:0; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; Total Failure Count in last (1000) mSECs :0; Average Resp time:0.0; Total Failure count in last (1000) mSECs :0; 90 the percentile resp time: 0.0; 95 the percentile resp time: 0.0; min resp time: 0.0; Max resp time: 0.0. Stddev resp time:0.0], [Server:localhost:9081; Zone:UNKNOWN; Total Requests:0; class connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; Total Failure Count in last (1000) mSECs :0; Average Resp time:0.0; Total Failure count in last (1000) mSECs :0; 90 the percentile resp time: 0.0; 95 the percentile resp time: 0.0; min resp time: 0.0; Max resp time: 0.0. Stddev resp time: 0.0]]} ServerList:com.net flix. The loadbalancer. Ddb59a ConfigurationBasedServerList @ 74Copy the code
The getAvailableZones method is coded as follows to calculate the valid available zones.
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) { // If the snapshot information is empty, null is returned
return null;
}
// Define a collection storage area node
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) { // If the set of valid fields is only 1, return it directly
return availableZones;
}
// Record the collection of problematic areas
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0; // Define a variable to hold the maximum load average for all zones
// true: The available zone is limited
// false: All zones are available
boolean limitedZoneAvailability = false; //
// Walk through all the region information. Analyze each zone one by one
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey(); // Get the zone string
ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); // Get the snapshot information of the zone
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) { // If there is no instance of the zone, that is completely unavailable, remove the zone and mark it as limited (not all available).
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer(); // Get the average load of the area
// The total number of fuses/instances on the machine has exceeded the threshold (default is 1, which means that all fuses are considered completely unavailable)
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) { //loadPerServer indicates that all nodes in the current zone are fused.
availableZones.remove(zone);
limitedZoneAvailability = true;
} else { // Enter the logic to indicate that it is not completely unavailable, just look at the state of the region
// If the current load is equal to the maximum load, the current zone is considered bad and added to the worstZones
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001 d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {// If the current load is greater than the maximum load.maxLoadPerServer = loadPerServer; worstZones.clear(); worstZones.add(zone); }}}}// If the maximum load is less than the set load threshold and limitedZoneAvailability=false
If all zones are available and the maximum load has not reached the threshold, return all zones
if(maxLoadPerServer < triggeringLoad && ! limitedZoneAvailability) {// zone override is not needed here
return availableZones;
}
// If the maximum load exceeds the threshold, it cannot return all of them, so it returns a random one from the region with the highest load.
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if(zoneToAvoid ! =null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
Copy the code
The above logic is still relatively complex, let’s explain it through a simple text:
- if
zone
Null, which means there is no available region, returns NULL - if
zone
And there’s nothing to choose from, so just return this one, right - use
Set<String> worstZones
To record the list of zones in bad condition, use themaxLoadPerServer
Indicates the zone with the highest load among all zones. withlimitedZoneAvailability
Indicates whether some zones are available (true: some zones are available, false: all zones are available). Then we need to traverse all zone information and judge one by one to process the results of valid zones.- If the current
zone
theinstanceCount
Zero, so just remove this area and mark itlimitedZoneAvailability
For partial availability, there is nothing to say. - Gets the current total load average
loadPerServer
, if the zoneFusing instance number/total number of instances > = triggeringBlackoutPercentage
orloadPerServer < 0
If yes, the current zone is faulty. Run remove to remove the current zone, andlimitedZoneAvailability=true
.- (
Number of Fuse Instances/Total Number of Fuse instances >= Threshold
, marked as the current zone is not available (removed), which makes sense. The threshold is0.99999 d
If all Server instances are fusible, the zone becomes unavailable. loadPerServer = -1
That is, when all instances are fusible. Both of these conditions are similar, both of which are to determine the availability of this area.
- (
- If the current zone does not reach the threshold, determine the zone load from all
zone
Find the region with the highest load in0.000001 d
), and add these regions toworstZones
The list, the collection, is the area that holds the high load.
- If the current
- After calculating the region data through the above traversal, the valid region data to be returned is finally set.
- The highest load
maxLoadPerServer
It’s still less than what’s providedTriggeringLoad threshold
And andlimitedZoneAvailability=false
If all zones are available, return all zones:availableZones
. (That is, if the load of all regions is within the threshold range and the nodes in each region are still alive, all return) - Otherwise, if the maximum load exceeds the threshold or there are some unavailable nodes in some areas, the nodes with higher load are used
worstZones
Randomly remove one from
- The highest load
AbstractServerPredicate
After answering the code below to determine the available service nodes by getEligibleServers, if the available nodes are not 0, the incrementAndGetModulo method is executed for polling.
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
Copy the code
This method is implemented through polling, the code is as follows!
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
returncurrent; }}Copy the code
The loading process of the service list
In this example, we configured the list of services in the application.properties file, which means that at some point the list will be loaded, stored somewhere. When will it be loaded?
In RibbonClientConfiguration the configuration class, there are the Bean’s statement, (the Bean is conditions trigger) is used to define the default load balancing.
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList
serverList, ServerListFilter
serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater)
{
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
Copy the code
As analyzed earlier, its class diagram is as follows!
When ZoneAwareLoadBalancer during initialization, can call the superclass DynamicServerListLoadBalancer method in the construction of the code is as follows.
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList
serverList, ServerListFilter
filter, ServerListUpdater serverListUpdater)
{
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
Copy the code
restOfInit
The restOfInit method does two main things.
- The dynamic Server update function is enabled
- Update the Server List
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature(); // Enable dynamic Server update
updateListOfServers(); // Update the Server list
if (primeConnection && this.getPrimeConnections() ! =null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
Copy the code
updateListOfServers
Update the service list in full.
public void updateListOfServers(a) {
List<T> servers = new ArrayList<T>();
if(serverListImpl ! =null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if(filter ! =null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
Copy the code
The above code is explained below
- As we are through
application.properties
File configuration static service address list, so at this timeserverListImpl
Example is:ConfigurationBasedServerList
, the callgetUpdatedListOfServers
Method is returned in theapplication.properties
A list of services defined in the file. - Determine if you need
filter
If yes, passfilter
Filter the service list.
Finally, call updateAllServerList to update all servers to the local cache.
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false.true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false); }}}Copy the code
Dynamic Ping mechanism
In the Ribbon, based on the mechanism of Ping, dynamic changes will occur, the target service address specific implementation way in DynamicServerListLoadBalancer. RestOfInit method
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature(); // Enable scheduled task dynamic update
updateListOfServers();
if (primeConnection && this.getPrimeConnections() ! =null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
Copy the code
public void enableAndInitLearnNewServersFeature(a) {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
Copy the code
Notice that a scheduled task is launched, and the program executed by the scheduled task is updateAction, which is an anonymous inner class defined below.
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate(a) { updateListOfServers(); }};Copy the code
The procedure for starting a scheduled task is as follows. The task is executed every 30 seconds.
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false.true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run(a) {
if(! isActive.get()) {if(scheduledFuture ! =null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate(); // Perform specific tasks.
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e); }}}; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs,/ / 1000
refreshIntervalMs, / / 30000
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op"); }}Copy the code
When the doUpdate method is triggered 30 seconds later, the updateAllServerList method is finally entered
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false.true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false); }}}Copy the code
Super.forcequickping () is called; Perform a heartbeat health check.
public void forceQuickPing(a) {
if (canSkipPing()) {
return;
}
logger.debug("LoadBalancer [{}]: forceQuickPing invoking", name);
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e); }}Copy the code
RibbonLoadBalancerClient.execute
Through the above analysis, a RibbonLoadBalancerClient again. The execute method.
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
Copy the code
If (loadBalancer, hint); if (loadBalancer, hint) This line of code returns a specific target server.
Before the execute method is called, a RibbonServer object is wrapped and passed on. Its main function is to record the load information of the request.
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest
request)
throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal); // Record the request status
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex); // Record the request status
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
Copy the code
request.apply
Request is a LoadBalancerRequest interface. It provides an apply method, but from the code we can see that this method does not implement the class, so where is it implemented?
Further down the line, we find that the Request object is passed from the Intercept method of the LoadBalancerInterceptor.
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName ! =null."Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
Copy the code
And request delivery, by enclosing requestFactory. CreateRequest (request, body, execution) to create, so we find this method.
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
return (instance) -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
LoadBalancerRequestTransformer transformer;
if (this.transformers ! =null) {
for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) { transformer = (LoadBalancerRequestTransformer)var6.next(); }}return execution.execute((HttpRequest)serviceRequest, body);
};
}
Copy the code
From the code, it is an anonymous inner class implemented with lambda expressions. In this inner class, we create a ServiceRequestWrapper which is actually a subclass of HttpRequestWrapper. Equestwrapper overrides the getURI() method of HttpRequestWrapper. The overridden URI is actually accessed by recreating a URI by calling reconstructURI of the LoadBalancerClient interface.
InterceptingClientHttpRequest.execute
The above code execution execution. The execute and will enter to InterceptingClientHttpRequest. The execute method, the code is as follows.
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else{ HttpMethod method = request.getMethod(); Assert.state(method ! =null."No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); // Notice here
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceofStreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); }else{ StreamUtils.copy(body, delegate.getBody()); }}returndelegate.execute(); }}Copy the code
Note that the instance of the Request object is HttpRequestWrapper.
request.getURI()
The.geturi () method in ServiceRequestWrapper is called when request.geturi () is called to get the target address for the HTTP request.
@Override
public URI getURI(a) {
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}
Copy the code
In this method, the reconstructURI method in the RibbonLoadBalancerClient instance is called to generate the target service address based on the service-ID.
RibbonLoadBalancerClient.reconstructURI
public URI reconstructURI(ServiceInstance instance, URI original) {
Assert.notNull(instance, "instance can not be null");
String serviceId = instance.getServiceId(); // Get the instance ID, which is the service name
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId); / / get RibbonLoadBalancerContext context, this is to get the spring container object instance.
URI uri;
Server server;
if (instance instanceof RibbonServer) { // If Instance is RibbonServer
RibbonServer ribbonServer = (RibbonServer) instance;
server = ribbonServer.getServer(); // Get the Server information of the target Server
uri = updateToSecureConnectionIfNeeded(original, ribbonServer); // Determine whether to update to a secure connection.
}
else { // If it is a normal HTTP address
server = new Server(instance.getScheme(), instance.getHost(),
instance.getPort());
IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
uri = updateToSecureConnectionIfNeeded(original, clientConfig,
serverIntrospector, server);
}
return context.reconstructURIWithServer(server, uri); // Call this method to concatenate a real target server address.
}
Copy the code
Copyright Notice: All articles on this blog are subject to a CC BY-NC-SA 4.0 license unless otherwise stated. Reprint please specify from Mic to take you to learn structure! If this article is helpful to you, please also help to point attention and like, your persistence is the power of my continuous creation. Welcome to pay attention to the same wechat public account for more technical dry goods!