This is the 12th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
So far in this series, we’ve looked at Resilience4j and its Retry, RateLimiter, and TimeLimiter modules. In this article, we will explore the Bulkhead module. We’ll see what it solves, when and how to use it, and look at some examples.
Code sample
Examples of working code on GitHub are attached to this article.
What is Resilience4j?
Please refer to the description in the previous article for a quick overview of how Resilience4j works in general.
What is fault isolation?
A few years ago, we had a production problem where one server stopped responding to a health check and the load balancer took the server out of the pool.
Just as we were starting to investigate this issue, there was a second alert — another server had stopped responding to the health check and was also taken out of the pool.
After a few minutes, every server stopped responding to health probes and our service was completely shut down.
We used Redis to cache some data for several features supported by the application. As we later discovered, the Redis cluster had some problems at the same time, and it stopped accepting new connections. We connect to Redis using the Jedis library, whose default behavior is to block the calling thread indefinitely until the connection is established.
Our service is hosted on Tomcat, and its default request processing thread pool size is 200 threads. Therefore, every request that passes through the code path connected to Redis ends up blocking the thread indefinitely.
Within minutes, all 2,000 threads in the cluster were blocked indefinitely — there weren’t even idle threads to respond to the load balancer’s health check.
The service itself supports several functions, not all of which require access to the Redis cache. But when this aspect goes wrong, it ends up affecting the entire service.
That’s what fault isolation is all about — it prevents problems in one servant region from affecting the entire service.
While what happened to our service is an extreme example, we can see how slow upstream dependencies can affect the irrelevant regions where the service is invoked.
If we set a limit of 20 concurrent requests to Redis on each server instance, only these threads will be affected when Redis connection problems occur. The remaining request processing threads can continue to service other requests.
The idea behind fault isolation is to set a limit on the number of concurrent calls we can make to remote services. We treat calls to different remote services as separate, isolated pools and set a limit on the number of calls that can be made simultaneously.
The term bulkhead itself comes from its use in ships, where the bottom of a ship is divided into sections separate from each other. If there is a crack and water starts flowing in, only that section will fill with water. This prevents the whole ship from sinking.
Resilience4j Concept of bulkhead
The working principle of Resilience4J-BulkHead is similar to other Resilience4J modules. We provide it with the code we want to execute as a function construct — a lambda expression to make a remote call or a Supplier to retrieve a value from a remote service, and so on — and the partition decorates it with code to control the number of concurrent calls.
Resilience4j provides two types of bulkheads – SemaphoreBulkhead and ThreadPoolBulkhead.
SemaphoreBulkhead internal use Java. Util. Concurrent. The Semaphore to control the number of concurrent calls and carry out our code in the current thread.
ThreadPoolBulkhead uses one thread from the thread pool to execute our code. Inside it USES Java. Util. Concurrent. ArrayBlockingQueue and Java. Util. Concurrent. The ThreadPoolExecutor is used to control the number of concurrent calls.
SemaphoreBulkhead
Let’s look at the configuration associated with semaphore partitions and their implications.
MaxConcurrentCalls determines the maximum number of concurrent calls we can make to the remote service. We can think of this value as the allowed number to initialize the semaphore.
Any thread that tries to invoke a remote service beyond this limit can either get a BulkheadFullException immediately or wait some time for another thread to release permission. This is determined by the maxWaitDuration value.
When multiple threads are waiting for licenses, the fairCallHandlingEnabled configuration determines whether the waiting threads obtain licenses in first-in-first-out order.
Finally, in BulkheadFullException writableStackTraceEnabled configuration allows us to reduce the amount of information in the stack trace. This is useful because without it, our log might be filled with a lot of similar information when the exception occurs multiple times. It is usually sufficient to know that a BulkheadFullException has occurred when the log is read.
ThreadPoolBulkhead
CoreThreadPoolSize, maxThreadPoolSize, keepAliveDuration, and queueCapacity are the main configurations associated with ThreadPoolBulkhead. These configurations are used internally by ThreadPoolBulkhead to construct a ThreadPoolExecutor.
InternalThreadPoolExecutor use available one of the idle thread incoming missions. If no thread is free to execute an incoming task, the task will be queued up for execution later when the thread is available. If queueCapacity has been reached, the remote call is rejected and a BulkheadFullException is returned.
Also have ThreadPoolBulkhead writableStackTraceEnabled configuration to control BulkheadFullException stack trace of the amount of information.
Use the Resilience4j partition module
Let’s look at how to use the various features available in the Resilience4J-BulkHead module.
We will use the same example as in the previous articles in this series. Suppose we are building a website for an airline that allows its customers to search and book flights. Our service talks to the remote service encapsulated by the FlightSearchService class.
SemaphoreBulkhead
When using semaphore based partitions, BulkheadRegistry, BulkheadConfig, and Bulkhead are the main abstractions we use.
The BulkheadRegistry is a factory for creating and managing Bulkhead objects.
BulkheadConfig encapsulates maxConcurrentCalls, maxWaitDuration, writableStackTraceEnabled and fairCallHandlingEnabled configuration. Each Bulkhead object is associated with a BulkheadConfig.
The first step is to create a BulkheadConfig:
BulkheadConfig config = BulkheadConfig.ofDefaults();
Copy the code
This will create a BulkheadConfig, The default value is maxConcurrentCalls (25), maxWaitDuration (0 s), writableStackTraceEnabled (true) and fairCallHandlingEnabled (true).
Suppose we want to limit the number of concurrent calls to 2, and we are willing to wait 2 seconds for the thread to be granted permission:
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(2)
.maxWaitDuration(Duration.ofSeconds(2))
.build();
Copy the code
Then we create a Bulkhead:
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("flightSearchService");
Copy the code
Now let’s express our code to run the flight search as Supplier and decorate it with BulkHead:
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("flightSearchService");
Copy the code
Finally, let’s call a few decorator operations to understand how the partition works. We can use CompletableFuture to simulate concurrent flight search requests from users:
for (int i=0; i<4; i++) {
CompletableFuture
.supplyAsync(decoratedFlightsSupplier)
.thenAccept(flights -> System.out.println("Received results"));
}
Copy the code
The timestamp and thread name in the output show that of the four concurrent requests, the first two pass immediately:
Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-3
Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-5
Flight search successful at 11:42:13 226
Flight search successful at 11:42:13 226
Received results
Received results
Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-9
Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-7
Flight search successful at 11:42:14 239
Flight search successful at 11:42:14 239
Received results
Received results
Copy the code
The third and fourth requests were granted only 1 second later, after the previous request had been completed.
If the thread cannot be licensed within the 2s maxWaitDuration we specified, a BulkheadFullException is thrown:
Caused by: io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:49)
at io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead.acquirePermission(SemaphoreBulkhead.java:164)
at io.github.resilience4j.bulkhead.Bulkhead.lambda$decorateSupplier$5(Bulkhead.java:194)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Copy the code
Other than the first line, the other lines in the stack trace don’t add much value. If BulkheadFullException occurs multiple times, these stack trace lines are repeated in our log file.
We can configure writableStackTraceEnabled set to false to reduce the amount of information in the stack trace:
BulkheadConfig config = BulkheadConfig.custom()
.maxConcurrentCalls(2)
.maxWaitDuration(Duration.ofSeconds(1))
.writableStackTraceEnabled(false)
.build();
Copy the code
Now, when A BulkheadFullException occurs, there is only one line in the stack trace:
Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3 Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5 io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls Flight search successful at 12:27:58 699 Flight search successful at 12:27:58 699 Received results Received resultsCopy the code
Similar to the other Resilience4j modules we have seen, Bulkhead also provides an additional method, DecorateCheckedSupplier (), decorateCompletionStage(), decorateRunnable(), decorateConsumer(), etc. Therefore, we can provide our code in a structure other than the Supplier Supplier.
ThreadPoolBulkhead
When using partition based on thread pool, ThreadPoolBulkheadRegistry, ThreadPoolBulkheadConfig and ThreadPoolBulkhead we use is the main abstractions.
ThreadPoolBulkheadRegistry object is used to create and manage ThreadPoolBulkhead factory.
ThreadPoolBulkheadConfig encapsulates the coreThreadPoolSize, maxThreadPoolSize, keepAliveDuration, and queueCapacity configurations. Each ThreadPoolBulkhead object is associated with a ThreadPoolBulkheadConfig.
The first step is to create a ThreadPoolBulkheadConfig:
ThreadPoolBulkheadConfig config =
ThreadPoolBulkheadConfig.ofDefaults();
Copy the code
This creates a ThreadPoolBulkheadConfig, The default values are coreThreadPoolSize (number of available processors -1), maxThreadPoolSize (maximum number of available processors), keepAliveDuration (20ms), and queueCapacity (100).
Suppose we want to limit the number of concurrent calls to 2:
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(2)
.coreThreadPoolSize(1)
.queueCapacity(1)
.build();
Copy the code
Then we create a ThreadPoolBulkhead:
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);
ThreadPoolBulkhead bulkhead = registry.bulkhead("flightSearchService");
Copy the code
Now let’s express our code to run the flight search as Supplier and decorate it with BulkHead:
Supplier<List<Flight>> flightsSupplier =
() -> service.searchFlightsTakingOneSecond(request);
Supplier<CompletionStage<List<Flight>>> decoratedFlightsSupplier =
ThreadPoolBulkhead.decorateSupplier(bulkhead, flightsSupplier);
Copy the code
And return a: Supplier < List < Flight > > SemaphoreBulkhead. DecorateSupplier (), ThreadPoolBulkhead. DecorateSupplier () returns a: Supplier < CompletionStage < List < Flight > >. This is because ThreadPoolBulkHead does not execute code synchronously on the current thread.
Finally, let’s call a few decorator operations to see how the partition works:
for (int i=0; i<3; i++) {
decoratedFlightsSupplier
.get()
.whenComplete((r,t) -> {
if(r ! =null) {
System.out.println("Received results");
}
if(t ! =null) { t.printStackTrace(); }}); }Copy the code
The timestamp and thread name in the output show that while the first two requests are executed immediately, the third request is queued and later executed by one of the freed threads:
Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-1
Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-2
Flight search successful at 16:15:00 136
Flight search successful at 16:15:00 135
Received results
Received results
Searching for flights; current time = 16:15:01 151; current thread = bulkhead-flightSearchService-2
Flight search successful at 16:15:01 151
Received results
Copy the code
If there are no free threads and no capacity in the queue, a BulkheadFullException is thrown:
Exception in thread "main" io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:64)
at io.github.resilience4j.bulkhead.internal.FixedThreadPoolBulkhead.submit(FixedThreadPoolBulkhead.java:157)
... other lines omitted ...
Copy the code
We can use writableStackTraceEnabled configuration to reduce the amount of information in the stack trace:
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
.maxThreadPoolSize(2)
.coreThreadPoolSize(1)
.queueCapacity(1)
.writableStackTraceEnabled(false)
.build();
Copy the code
Now, when A BulkheadFullException occurs, there is only one line in the stack trace:
Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3 Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5 io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls Flight search successful at 12:27:58 699 Flight search successful at 12:27:58 699 Received results Received resultsCopy the code
Context propagation
Sometimes we store data in a ThreadLocal variable and read it in a different area of the code. We do this to avoid explicitly passing data as a parameter between method chains, especially if the value is not directly related to the core business logic we are implementing.
For example, we might want to record the current user ID or transaction ID or a request trace ID to each logging statement to make it easier to search the logs. Using ThreadLocal is a useful technique for such scenarios.
With ThreadPoolBulkhead, the data we store in ThreadLocal variables will not be available in other threads because our code is not executing on the current thread.
Let’s look at an example to understand this problem. First we define a RequestTrackingIdHolder class, a wrapper around ThreadLocal:
class RequestTrackingIdHolder {
static ThreadLocal<String> threadLocal = new ThreadLocal<>();
static String getRequestTrackingId(a) {
return threadLocal.get();
}
static void setRequestTrackingId(String id) {
if(threadLocal.get() ! =null) {
threadLocal.set(null);
threadLocal.remove();
}
threadLocal.set(id);
}
static void clear(a) {
threadLocal.set(null); threadLocal.remove(); }}Copy the code
Static methods can easily set and retrieve values stored on ThreadLocal. We next set a request trace ID before calling the flight search operation for partition decoration:
for (int i=0; i<2; i++) {
String trackingId = UUID.randomUUID().toString();
System.out.println("Setting trackingId " + trackingId + " on parent, main thread before calling flight search");
RequestTrackingIdHolder.setRequestTrackingId(trackingId);
decoratedFlightsSupplier
.get()
.whenComplete((r,t) -> {
// other lines omitted
});
}
Copy the code
Example output shows that this value is not available in partitions managed threads:
Setting trackingId 98ff99df-466a-47f7-88f7-5e31fc8fcb6b on parent, main thread before calling flight search
Setting trackingId 6b98d73c-a590-4a20-b19d-c85fea783caf on parent, main thread before calling flight search
Searching for flights; current time = 19:53:53 799; current thread = bulkhead-flightSearchService-1; Request Tracking Id = null
Flight search successful at 19:53:53 824
Received results
Searching for flights; current time = 19:53:54 836; current thread = bulkhead-flightSearchService-1; Request Tracking Id = null
Flight search successful at 19:53:54 836
Received results
Copy the code
To solve this problem, ThreadPoolBulkhead provides a ContextPropagator. ContextPropagator is an abstraction used to retrieve, copy, and clean values across thread boundaries. It defines an interface that contains methods to retrieve values from the current thread (retrieve()), copy them to a new thread of execution (copy()), and eventually clean them up on the thread of execution (clear()).
Let us achieve a RequestTrackingIdPropagator:
class RequestTrackingIdPropagator implements ContextPropagator {
@Override
public Supplier<Optional> retrieve(a) {
System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName());
return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
}
@Override
Consumer<Optional> copy(a) {
return optional -> {
System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
};
}
@Override
Consumer<Optional> clear(a) {
return optional -> {
System.out.println("Clearing request tracking id on thread: "+ Thread.currentThread().getName()); optional.ifPresent(s -> RequestTrackingIdHolder.clear()); }; }}Copy the code
We provide a ContextPropagator for ThreadPoolBulkHeadBy setting ThreadPoolBulkheadConfig:
class RequestTrackingIdPropagator implements ContextPropagator {
@Override
public Supplier<Optional> retrieve(a) {
System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName());
return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
}
@Override
Consumer<Optional> copy(a) {
return optional -> {
System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
};
}
@Override
Consumer<Optional> clear(a) {
return optional -> {
System.out.println("Clearing request tracking id on thread: "+ Thread.currentThread().getName()); optional.ifPresent(s -> RequestTrackingIdHolder.clear()); }; }}Copy the code
Now, the sample output shows that the request trace ID is available in the partition managed thread:
Setting trackingId 71d44cb8-dab6-4222-8945-e7fd023528ba on parent, main thread before calling flight search
Getting request tracking id from thread: main
Setting trackingId 5f9dd084-f2cb-4a20-804b-038828abc161 on parent, main thread before calling flight search
Getting request tracking id from thread: main
Setting request tracking id 71d44cb8-dab6-4222-8945-e7fd023528ba on thread: bulkhead-flightSearchService-1
Searching for flights; current time = 20:07:56 508; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 71d44cb8-dab6-4222-8945-e7fd023528ba
Flight search successful at 20:07:56 538
Clearing request tracking id on thread: bulkhead-flightSearchService-1
Received results
Setting request tracking id 5f9dd084-f2cb-4a20-804b-038828abc161 on thread: bulkhead-flightSearchService-1
Searching for flights; current time = 20:07:57 542; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 5f9dd084-f2cb-4a20-804b-038828abc161
Flight search successful at 20:07:57 542
Clearing request tracking id on thread: bulkhead-flightSearchService-1
Received results
Copy the code
Bulkhead event
Both Bulkhead and ThreadPoolBulkhead have an EventPublisher to generate the following types of events:
- BulkheadOnCallPermittedEvent
- BulkheadOnCallRejectedEvent and
- BulkheadOnCallFinishedEvent
We can listen for these events and record them, for example:
Bulkhead bulkhead = registry.bulkhead("flightSearchService");
bulkhead.getEventPublisher().onCallPermitted(e -> System.out.println(e.toString()));
bulkhead.getEventPublisher().onCallFinished(e -> System.out.println(e.toString()));
bulkhead.getEventPublisher().onCallRejected(e -> System.out.println(e.toString()));
Copy the code
The sample output shows the contents of the record:
2020-08-26T12:27:39.790435: Bulkhead 'flightSearch' permitted a call.... other lines omitted ... 2020-08-26T12:27:40.290987: Bulkhead 'flightSearch' Rejected a call.... other lines omitted ... 2020-08-26T12:27:41.094866: Bulkhead 'flightSearch' has finished a call.Copy the code
Bulkhead indicators
SemaphoreBulkhead
Bulkhead exposed two metrics:
- Maximum number of permissions available (
resilience4j.bulkhead.max.allowed.concurrent.calls
), and - Number of concurrent calls allowed (
resilience4j.bulkhead.available.concurrent.calls
).
The BulkHead. Available metric is the same as maxConcurrentCalls we configured on BulkheadConfig.
First, we create BulkheadConfig, BulkheadRegistry, and Bulkhead as before. Then we create a MeterRegistry and bind BulkheadRegistry to it:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedBulkheadMetrics.ofBulkheadRegistry(registry)
.bindTo(meterRegistry);
Copy the code
After running several partition decorator operations, we display captured metrics:
Consumer<Meter> meterConsumer = meter -> {
String desc = meter.getId().getDescription();
String metricName = meter.getId().getName();
Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
.filter(m -> m.getStatistic().name().equals("VALUE"))
.findFirst()
.map(m -> m.getValue())
.orElse(0.0);
System.out.println(desc + "-" + metricName + ":"+ metricValue); }; meterRegistry.forEachMeter(meterConsumer);Copy the code
Here is some sample output:
The maximum number of available permissions - resilience4j.bulkhead.max.allowed.concurrent.calls: 8.0 The number of available permissions - resilience4j. Bulkhead. The available. Concurrent. Calls: 3.0Copy the code
ThreadPoolBulkhead
ThreadPoolBulkhead exposes five indicators:
- Current length of queue (
resilience4j.bulkhead.queue.depth
), - Current thread pool size (
resilience4j.bulkhead.thread.pool.size
), - The core and maximum capacity of the thread pool (
resilience4j.bulkhead.core.thread.pool.size
和resilience4j.bulkhead.max.thread.pool.size
), and - Queue capacity (
resilience4j.bulkhead.queue.capacity
).
First, we create ThreadPoolBulkheadConfig, ThreadPoolBulkheadRegistry and ThreadPoolBulkhead like front. Then, we will create a MeterRegistry and ThreadPoolBulkheadRegistry bind to it:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedThreadPoolBulkheadMetrics.ofThreadPoolBulkheadRegistry(registry).bindTo(meterRegistry);
Copy the code
After running a few partition decorator operations, we will display captured metrics:
The queue capacity - resilience4j. Bulkhead. Queue. Capacity: 5.0 The queue The depth - resilience4j. Bulkhead. The queue. The depth: 1.0 The thread pool size - resilience4j. Bulkhead.. Thread pool. The size: 5.0 The maximum thread pool size - resilience4j. Bulkhead. Max.. Thread pool. The size: 5.0 The core thread pool size - resilience4j. Bulkhead. Core.. Thread pool. Size: 3.0Copy the code
In practice, we regularly export the data to the monitoring system and analyze it on the dashboard.
Pitfalls and good practices when implementing partitions
Make the partition a singleton
All invocations to a given remote service should go through the same Bulkhead instance. A Bulkhead must be a singleton for a given remote service.
If we did not force this, some areas of our code base might bypass Bulkhead and call the remote service directly. To prevent this, the actual invocation of the remote service should use the partition decorator exposed by the inner layer in one core, inner layer, and other areas.
How do we ensure that future new developers understand this intent? Check out Tom’s article, which shows one way around this problem by organizing the package structure to make such intentions clear. In addition, it shows how to enforce this action by encoding intents in ArchUnit tests.
Combine with other Resilience4j modules
It is more effective to use partitions in conjunction with one or more other Resilience4j modules, such as retry and rate limiter. For example, if there was a BulkheadFullException, we might want to retry after some delay.
conclusion
In this article, we learned how to use the Bulkhead module of Resilience4j to set limits on our concurrent calls to remote services. We learned why this is important and saw some practical examples of how to configure it.
You can demonstrate a complete application using the code on GitHub.
Conducting Bulkhead with resilience4J-reflecing