This is the 21st day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
This series code address: github.com/JoJoTec/spr…
To implement what we mentioned in the last section:
- You need to add some logs to the retries and interrupts for later optimization
- Retry exceptions need to be defined and, in conjunction with a circuit breaker, non-2XX response codes are also encapsulated as specific exceptions
- Data updates similar to the load balancing in FeignClient need to be added to the circuit breaker related Operator to make the load balancing more intelligent
We need to make some modifications to the bonding library provided by Resilience4J itself, which is mainly the modification of the Project reactor Operator implemented by Resilience4J.
About the modification of circuit breaker
First of all, the return object of WebClient can only be of ClientResponse type, so the modified Operator does not need to take a parameter. It only needs to be for ClientResponse, that is:
public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {
...
}
Copy the code
In the original breaker logic, we need to add retry logic for GET methods and previously defined retry path matching, which requires us to GET the original REQUEST URL information. But there is no interface to expose this information in ClientResponse, which implements DefaultClientResponse by default (as long as we don’t add special modification logic to WebClient ourselves, The request() method in DefaultClientResponse gets the request HttpRequest, which contains URL information. But this class also has methods that are package-private, so we need to reflect them:
ClientResponseCircuitBreakerSubscriber
private static final Class<?> aClass;
private static final Method request;
static {
try {
aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");
request = ReflectionUtils.findMethod(aClass, "request");
request.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Copy the code
After that, the logic for logging the circuit breaker once the ClientResponse is obtained needs to be added to the retries modification mentioned above, as well as the load balancer log:
ClientResponseCircuitBreakerSubscriber
protected void hookOnNext(ClientResponse clientResponse) { if (! isDisposed()) { if (singleProducer && successSignaled.compareAndSet(false, true)) { int rawStatusCode = clientResponse.rawStatusCode(); HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode); try { HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse); If (httprequest.getmethod ()! Httprequest.getmethod ()! = HttpMethod.GET && ! WebClientProperties. RetryablePathsMatch (httpRequest. GetURI () getPath ())) {/ / if you can't try again, Directly returns the circuitBreaker, onResult (circuitBreaker getCurrentTimestamp () - start, circuitBreaker, getTimestampUnit (), clientResponse); } else { if (httpStatus ! . = null && httpStatus is2xxSuccessful ()) {/ / if successful, Directly returns the circuitBreaker, onResult (circuitBreaker getCurrentTimestamp () - start, circuitBreaker, getTimestampUnit (), clientResponse); } else {/** * Refer to the code for DefaultClientResponse for exception encapsulation * @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException */ Exception exception; if (httpStatus ! = null) { exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null); } else { exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null); } circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception); downstreamSubscriber.onError(exception); return; } } } catch (Exception e) { log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e); circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse); } } eventWasEmitted.set(true); downstreamSubscriber.onNext(clientResponse); }}Copy the code
Also, add load balancing data to the original log logic of complete, cancel, and fail:
ClientResponseCircuitBreakerSubscriber
@Override protected void hookOnComplete() { if (successSignaled.compareAndSet(false, true)) { serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true); circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit()); } downstreamSubscriber.onComplete(); } @Override public void hookOnCancel() { if (! successSignaled.get()) { serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true); if (eventWasEmitted.get()) { circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit()); } else { circuitBreaker.releasePermission(); } } } @Override protected void hookOnError(Throwable e) { serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false); circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e); downstreamSubscriber.onError(e); }Copy the code
Glue WebClient and Resilience4J together to override retry logic
Because in front of the circuit breaker, we can try again not 2 xx response encapsulated into WebClientResponseException. So in the retries, we need to add a retry for this exception.
Also, you need to put the retries before the load balancer, because each retry will fetch a new instance from the load balancer. Also, the circuit breaker needs to be placed after the load balancer, because only after that can the instance of this call be retrieved. Our circuit breaker is for instance method level:
WebClientDefaultConfiguration.java
@Bean
public WebClient getWebClient(
ReactorLoadBalancerExchangeFilterFunction lbFunction,
WebClientConfigurationProperties webClientConfigurationProperties,
Environment environment,
RetryRegistry retryRegistry,
CircuitBreakerRegistry circuitBreakerRegistry,
ServiceInstanceMetrics serviceInstanceMetrics
) {
String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);
Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();
if (configs == null || configs.size() == 0) {
throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");
}
WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);
if (webClientProperties == null) {
throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);
}
String serviceName = webClientProperties.getServiceName();
//如果没填写微服务名称,就使用配置 key 作为微服务名称
if (StringUtils.isBlank(serviceName)) {
serviceName = name;
}
String baseUrl = webClientProperties.getBaseUrl();
//如果没填写 baseUrl,就使用微服务名称填充
if (StringUtils.isBlank(baseUrl)) {
baseUrl = "http://" + serviceName;
}
Retry retry = null;
try {
retry = retryRegistry.retry(serviceName, serviceName);
} catch (ConfigurationNotFoundException e) {
retry = retryRegistry.retry(serviceName);
}
//覆盖其中的异常判断
retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
//WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对可以重试的请求封装了 WebClientResponseException
//参考 ClientResponseCircuitBreakerSubscriber 的代码
if (throwable instanceof WebClientResponseException) {
log.info("should retry on {}", throwable.toString());
return true;
}
//断路器异常重试,因为请求没有发出去
if (throwable instanceof CallNotPermittedException) {
log.info("should retry on {}", throwable.toString());
return true;
}
if (throwable instanceof WebClientRequestException) {
WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;
HttpMethod method = webClientRequestException.getMethod();
URI uri = webClientRequestException.getUri();
//判断是否为响应超时,响应超时代表请求已经发出去了,对于非 GET 并且没有标注可以重试的请求则不能重试
boolean isResponseTimeout = false;
Throwable cause = throwable.getCause();
//netty 的读取超时一般是 ReadTimeoutException
if (cause instanceof ReadTimeoutException) {
log.info("Cause is a ReadTimeoutException which indicates it is a response time out");
isResponseTimeout = true;
} else {
//对于其他一些框架,使用了 java 底层 nio 的一般是 SocketTimeoutException,message 为 read time out
//还有一些其他异常,但是 message 都会有 read time out 字段,所以通过 message 判断
String message = throwable.getMessage();
if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {
log.info("Throwable message contains readtimeout which indicates it is a response time out");
isResponseTimeout = true;
}
}
//如果请求是 GET 或者标注了重试,则直接判断可以重试
if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {
log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
return true;
} else {
//否则,只针对请求还没有发出去的异常进行重试
if (isResponseTimeout) {
log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());
} else {
log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
return true;
}
}
}
return false;
}).build());
HttpClient httpClient = HttpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis())
.doOnConnected(connection ->
connection
.addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
.addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
);
Retry finalRetry = retry;
String finalServiceName = serviceName;
return WebClient.builder()
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
//最大 body 占用 16m 内存
.maxInMemorySize(16 * 1024 * 1024))
.build())
.clientConnector(new ReactorClientHttpConnector(httpClient))
//Retry在负载均衡前
.filter((clientRequest, exchangeFunction) -> {
return exchangeFunction
.exchange(clientRequest)
.transform(ClientResponseRetryOperator.of(finalRetry));
})
//负载均衡器,改写url
.filter(lbFunction)
//实例级别的断路器需要在负载均衡获取真正地址之后
.filter((clientRequest, exchangeFunction) -> {
ServiceInstance serviceInstance = getServiceInstance(clientRequest);
serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
CircuitBreaker circuitBreaker;
//这时候的url是经过负载均衡器的,是实例的url
//需要注意的一点是,使用异步 client 的时候,最好不要带路径参数,否则这里的断路器效果不好
//断路器是每个实例每个路径一个断路器
String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();
try {
//使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}
log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));
return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));
}).baseUrl(baseUrl)
.build();
}
private ServiceInstance getServiceInstance(ClientRequest clientRequest) {
URI url = clientRequest.url();
DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
defaultServiceInstance.setHost(url.getHost());
defaultServiceInstance.setPort(url.getPort());
return defaultServiceInstance;
}
Copy the code
Thus, we have implemented the configuration-based WebClient that we have encapsulated
Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers: