This is the 10th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
Basic introduction to
Resilience4j is a lightweight, easy-to-use fault-tolerant library inspired by Netflix Hystrix, but designed specifically for Java 8 and functional programming. Lightweight, because the library only uses Vavr, it doesn’t have any other external dependencies under it. In contrast, Netflix Hystrix has a compile dependency on Archaius, which has more external library dependencies, such as Guava and Apache Commons Configuration.
Using Resilience4j
To use Resilience4j, you don’t need to import all the dependencies, just select the ones you need. Resilience4j provides the following core modules and extension modules:
The core module
- resilience4j-circuitbreaker: Circuit breaking
- resilience4j-ratelimiter: Rate limiting
- resilience4j-bulkhead: Bulkheading
- resilience4j-retry: Automatic retrying (sync and async)
- resilience4j-cache: Result caching
- resilience4j-timelimiter: Timeout handling
Circuitbreaker
CircuitBreaker is implemented through finite state machines with three normal states: CLOSED, OPEN, and HALF_OPEN and two special states: DISABLED and FORCED_OPEN.
-
When the fuse is closed, all requests pass through the fuse.
-
If the failure rate exceeds a set threshold, the fuse switches from closed to open and all requests are rejected.
-
After a period of time, the fuse changes from the open state to the half-open state. In this case, only a certain number of requests are placed in the fuse. The failure rate is recalculated.
Ring Bit Buffer
The Resilience4j data structure for recording request status is different from that of Hystrix, which uses a sliding window for storage, while The Resilience4j uses a Ring Bit Buffer.
The Ring Bit Buffer internally uses data structures such as BitSet for storage. The structure of BitSet is shown in the figure below:
The success or failure status of each request takes up only one bit, which is more memory efficient than a Boolean array. Bitsets use long[] arrays to store this data, meaning that a 16-value (64-bit) array can store 1024 call states.
Execution monitoring range
Calculating the failure rate requires filling the circular buffer. If the size of the ring buffer is 10, the failure rate must be calculated at least 10 times. If only 9 requests are made, the fuse will not turn on even if all 9 requests fail.
Request interception control
However, setting the buffer size to 10 in the CLOSE state does not mean that only 10 requests will be entered, and all requests will be placed until the fuse is turned on.
State transition mechanism
-
When the failure rate is higher than the threshold, the fuse status changes from CLOSE to OPEN. Then all request will throw CallNotPermittedException anomalies.
-
After a period of time, the state of the fuse will change from OPEN to HALF_OPEN. In the HALF_OPEN state, there will also be a Ring Bit Buffer to calculate the failure rate in the HALF_OPEN state. If it is higher than the configured threshold, it will change to OPEN. If the value is lower than the threshold, change the value to CLOSE.
-
The difference with the CLOSE buffer is that the HALF_OPEN buffer size limits the number of requests, and only buffer-size requests are placed.
-
DISABLED (always allow access) and FORCED_OPEN (always deny access). These two states do not generate fuse events (except for state changes) and do not record the success or failure of the event. The only way to exit either state is to trigger a state transition or reset the fuse.
Integration of SpringBoot
The Resilience4J-spring-boot module is integrated with circuitBeaker, Retry, Bulkhead and Ratelimiter modules. Since we need to learn other modules in the future, we will directly introduce the dependency of Resilience4J-spring-boot.
Maven configuration POM.xml
The IDE used for the test was IDEA, and springboot was used for the learning test. Maven dependency was first introduced:
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot</artifactId>
<version>0.9. 0</version>
</dependency>
Copy the code
Application. Yml configuration
resilience4j: circuitbreaker: configs: default: ringBufferSizeInClosedState: 5 # ringBufferSizeInHalfOpenState fuse closing buffer size: 2 # fuse half-opened waitDurationInOpenState buffer size: FailureRateThreshold: 60 # Fuse opening failure threshold eventConsumerBufferSize: # # 10 event buffer size registerHealthIndicator: true health monitoring automaticTransitionFromOpenToHalfOpenEnabled: False # Whether the recordFailurePredicate automatically starts from open to partially open without triggering the recordFailurePredicate: Com. Example. Resilience4j. Exceptions. RecordFailurePredicate # predicate sets whether abnormal recordExceptions for failure: # record exception - com. Hyts. Resilience4j. Exceptions. Service1Exception - com. Hyts. Resilience4j. Exceptions. Service2Exception IgnoreExceptions: # ignored exception - com. Example. Resilience4j. Exceptions. BusinessAException instances: service1: baseConfig: default waitDurationInOpenState: 5000 failureRateThreshold: 20 service2: baseConfig: defaultCopy the code
Multiple fuse instances can be configured, using different configurations or overwriting configurations.
Secured back-end services
Take a back-end service as an example and use a fuse to protect the service.
interface RemoteService {
List<User> process(a) throws TimeoutException, InterruptedException;
}
Copy the code
The connector invokes the service
This is the connector that calls the remote service, and we call the back-end service by calling methods in the connector.
public RemoteServiceConnector{
public List<User> process(a) throws TimeoutException, InterruptedException {
List<User> users;
users = remoteServic.process();
returnusers; }}Copy the code
Monitor fuses status and events
The function of each configuration item is to obtain the fuse status at a specific time:
@Log4j2
public class CircuitBreakerUtil {
/ * * *@Description: Gets the state of the fuse */
public static void getCircuitBreakerStatus(String time, CircuitBreaker circuitBreaker){
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
// Returns the failure rate in percentage.
float failureRate = metrics.getFailureRate();
// Returns the current number of buffered calls.
int bufferedCalls = metrics.getNumberOfBufferedCalls();
// Returns the current number of failed calls.
int failedCalls = metrics.getNumberOfFailedCalls();
// Returns the current number of successed calls.
int successCalls = metrics.getNumberOfSuccessfulCalls();
// Returns the max number of buffered calls.
int maxBufferCalls = metrics.getMaxNumberOfBufferedCalls();
// Returns the current number of not permitted calls.
long notPermittedCalls = metrics.getNumberOfNotPermittedCalls();
log.info(time + "state=" +circuitBreaker.getState() + " , metrics[ failureRate=" + failureRate +
", bufferedCalls=" + bufferedCalls +
", failedCalls=" + failedCalls +
", successCalls=" + successCalls +
", maxBufferCalls=" + maxBufferCalls +
", notPermittedCalls=" + notPermittedCalls +
"]"
);
}
/ * * *@Description: Listens for fuse events */
public static void addCircuitBreakerListener(CircuitBreaker circuitBreaker){
circuitBreaker.getEventPublisher()
.onSuccess(event -> log.info("Service call successful:" + event.toString()))
.onError(event -> log.info("Service call failed:" + event.toString()))
.onIgnoredError(event -> log.info("Service invocation failed but exception ignored:" + event.toString()))
.onReset(event -> log.info("Fuse reset:" + event.toString()))
.onStateTransition(event -> log.info("Fuse state change:" + event.toString()))
.onCallNotPermitted(event -> log.info("The fuse is on:" + event.toString()))
;
}
Copy the code
A method is called
CircuitBreaker supports both programmatic calls and AOP calls using annotations.
Call a method programmatically
In a CircuitService, the registry is injected and the fuse is obtained by the fuse name using the registry. If you don’t need the downgrading function, you can call the fuse’s executeSupplier method or the executeCheckedSupplier method directly:
public class CircuitBreakerServiceImpl{
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
public List<User> circuitBreakerNotAOP(a) throws Throwable {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("service1");
CircuitBreakerUtil.getCircuitBreakerStatus("Before execution begins:", circuitBreaker); circuitBreaker.executeCheckedSupplier(remotServiceConnector::process); }}Copy the code
If you need to use a degrade function, you use a method that wraps services on line and then try.of ().recover(). You can also use different degrade methods for different exceptions:
public class CircuitBreakerServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
public List<User> circuitBreakerNotAOP(a){
// Get an instance of the fuse through the registry
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker("service1");
CircuitBreakerUtil.getCircuitBreakerStatus("Before execution begins:", circuitBreaker);
// Use fuses to wrap connectors
CheckedFunction0<List<User>> checkedSupplier = CircuitBreaker.
decorateCheckedSupplier(circuitBreaker, remoteServiceConnector::process);
// call try.of ().recover() and degrade it
Try<List<User>> result = Try.of(checkedSupplier).
recover(CallNotPermittedException.class, throwable -> {
log.info("Fuses are on, access to protected methods is denied ~");
CircuitBreakerUtil
.getCircuitBreakerStatus("Fuse open :", circuitBreaker);
List<User> users = new ArrayList();
return users;
})
.recover(throwable -> {
log.info(throwable.getLocalizedMessage() + ", the method is degraded ~~");
CircuitBreakerUtil
.getCircuitBreakerStatus("Of the degradation methods :",circuitBreaker);
List<User> users = new ArrayList();
return users;
});
CircuitBreakerUtil.getCircuitBreakerStatus("After execution:", circuitBreaker);
returnresult.get(); }}Copy the code
Aop-style invocation method
First use the @circuitbreaker (name=””,fallbackMethod=””) annotation on the connector method, where name is the name of the fuse to use and fallbackMethod is the degrade method to use. The degrade method must be in the same class as the original method. The return value of the degraded method should be the same as that of the original method, and an additional exception parameter should be added to the input parameter, like this:
public RemoteServiceConnector{
@CircuitBreaker(name = "backendA", fallbackMethod = "fallBack")
public List<User> process(a) throws TimeoutException, InterruptedException {
List<User> users;
users = remoteServic.process();
return users;
}
private List<User> fallBack(Throwable throwable){
log.info(throwable.getLocalizedMessage() + ", the method is degraded ~~");
CircuitBreakerUtil.getCircuitBreakerStatus("Of the degradation methods :", circuitBreakerRegistry.circuitBreaker("backendA"));
List<User> users = new ArrayList();
return users;
}
private List<User> fallBack(CallNotPermittedException e){
log.info("Fuses are on, access to protected methods is denied ~");
CircuitBreakerUtil.getCircuitBreakerStatus("Fuse open :", circuitBreakerRegistry.circuitBreaker("backendA"));
List<User> users = new ArrayList();
returnusers; }}Copy the code
Can use multiple degradation method, keep the same method name, at the same time satisfy the conditions of degradation method is triggered one of the most close to (near here refers to the type of close to, it will trigger the nearest subclass exception), for example, if the process () method throws CallNotPermittedException, Will trigger the fallBack (CallNotPermittedException e) method without triggering fallBack (Throwable Throwable) method.
Then call the method directly:
public class CircuitBreakerServiceImpl {
@Autowired
private RemoteServiceConnector remoteServiceConnector;
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry;
public List<User> circuitBreakerAOP(a) throws TimeoutException, InterruptedException {
CircuitBreakerUtil
.getCircuitBreakerStatus("Before execution begins:",circuitBreakerRegistry.circuitBreaker("backendA"));
List<User> result = remoteServiceConnector.process();
CircuitBreakerUtil
.getCircuitBreakerStatus("After execution:", circuitBreakerRegistry.circuitBreaker("backendA"));
returnresult; }}Copy the code
Use the test
First, we define two exceptions. Exception A is both in the black and white list, and exception B is only in the blacklist:
recordExceptions: # record exception – com. Example. Resilience4j. Exceptions. BusinessBException – com.example.resilience4j.exceptions.BusinessAException ignoreExceptions: # ignore exception – com. Example. Resilience4j. Exceptions. BusinessAException then protected the backend interface for the realization of the following:
public class RemoteServiceImpl implements RemoteService {
private static AtomicInteger count = new AtomicInteger(0);
public List<User> process(a) {
int num = count.getAndIncrement();
log.info("The value of count =" + num);
if (num % 4= =1) {throw new BusinessAException("Exception A, does not need to be recorded.");
}
if (num % 4= =2 || num % 4= =3) {throw new BusinessBException("Exception B needs to be logged.");
}
log.info("Service up and running, get user list");
// Simulate a normal query of the database
returnrepository.findAll(); }}Copy the code
Using AOP in CircuitBreakerServiceImpl or program invocations of methods for unit testing, circulation call 10 times:
public class CircuitBreakerServiceImplTest{
@Autowired
private CircuitBreakerServiceImpl circuitService;
@Test
public void circuitBreakerTest(a) {
for (int i=0; i<10; i++){
// circuitService.circuitBreakerAOP();circuitService.circuitBreakerNotAOP(); }}}Copy the code
It can also be seen that the whitelist is ignored, which means that it is not included in the buffer (i.e. it does not succeed or fail), degraded methods will call degraded methods, and no degraded methods will throw an exception, just like any other exception.
public class CircuitBreakerServiceImplTest{
@Autowired private CircuitBreakerServiceImpl circuitService; @Test public void circuitBreakerThreadTest() throws InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); for (int i=0; i<15; i++){ pool.submit( // circuitService::circuitBreakerAOP circuitService::circuitBreakerNotAOP); } pool.shutdown(); while (! pool.isTerminated()); Thread.sleep(10000); Log.info (" Fuse state has changed to half-open "); pool = Executors.newCachedThreadPool(); for (int i=0; i<15; i++){ pool.submit( // circuitService::circuitBreakerAOP circuitService::circuitBreakerNotAOP); } pool.shutdown(); while (! pool.isTerminated()); for (int i=0; i<10; i++){ } }Copy the code
}
Resilience4j: circuitbreaker: configs: myDefault: automaticTransitionFromOpenToHalfOpenEnabled: true if # automatically from the open to half openCopy the code