A chestnut
An exception occurred during execution of a task in the mock thread pool.
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
int i = 1 / 0;
});
Copy the code
When this code is executed, it will certainly report a division by zero exception, but we will not receive any error messages. The reason for this is that the thread pool stores information about exceptions that occur during execution, and then throws an exception wrapped by ExecutionException when the get method is called. For details, see juejin.cn/post/696172… So we need to determine if an exception is thrown during execution. We can use a try-catch block to catch and handle the exception.
@Test
public void test(a) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
try{
executorService.submit(() -> {
int i = 1 / 0;
}).get();
}catch(ExecutionException e){ e.printStackTrace(); }}Copy the code
This method can handle exceptions, but if the get method is called, it blocks the thread until the run method completes, thus losing the sense of asynchrony.
Against such a situation, we can put our mission for packaging, and then submitted to the thread pool, in this way, task execution error, can be directly go we prepared in advance of abnormal processing logic, in this way, the program can be realized is executed asynchronously, task execution error, the thread pool will not eat our exception.
The general meaning is as follows:
@Test
public void test(a) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Tasks that need to be submitted to the thread pool
Runnable task = () -> {
int i = 1 / 0;
};
// Wrap the task to do exception handling
Runnable taskWrapper = () -> {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
// Todo exception handling}}; executorService.submit(taskWrapper); }Copy the code
Console output:
java.lang.ArithmeticException: / by zero
at org.ywb.practise.difficulty.ExecutorMain.lambda$test$0(ExecutorMain.java:20)
at org.ywb.practise.difficulty.ExecutorMain.lambda$test$1(ExecutorMain.java:26)
Copy the code
This is the core processing logic, but if you write code like this, you may be killed by the supervisor :- (, to ensure that the program is reusable, you can modify it a little ~
- Runnable Exception handling
@FunctionalInterface
public interface RunnableErrorHandler {
/** * runnable exception handling **@paramThrowable exception * /
void errorHandler(Throwable throwable);
}
Copy the code
- A class that wraps a thread pool
public class ExecutorServiceWrapper {
private final ExecutorService threadPoolExecutor;
private RunnableErrorHandler defaultRunnableErrHandler;
public ExecutorServiceWrapper(ExecutorService threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}
public ExecutorServiceWrapper(ExecutorService threadPoolExecutor, RunnableErrorHandler defaultRunnableErrHandler) {
this.threadPoolExecutor = threadPoolExecutor;
this.defaultRunnableErrHandler = defaultRunnableErrHandler;
}
/** * Do not pass in the exception handling mechanism, the program uses the default exception handling mechanism **@paramTask Indicates the task * to be performed@return future<Void>
*/
publicFuture<? > submit(Runnable task) {return threadPoolExecutor.submit(() -> {
try {
task.run();
} catch (Throwable e) {
if(defaultRunnableErrHandler ! =null) { defaultRunnableErrHandler.errorHandler(e); }}}); }/** custom exception handling mechanism **@paramTask Indicates the task * to be performed@paramErrorHandler Exception handling *@return future<Void>
*/
publicFuture<? > submit(Runnable task, RunnableErrorHandler errorHandler) {return threadPoolExecutor.submit(() -> {
try {
task.run();
} catch(Throwable e) { errorHandler.errorHandler(e); }}); }}Copy the code
The Runnable encapsulation is provided here, so you can imagine the Callable method.
- demo
- Here, when the thread pool wrapper is constructed, the default exception handling mechanism is passed in and the exception stack is printed
- The first task uses the default exception handling mechanism
- The second task uses a custom exception handling mechanism
@Test
public void test1(a) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Wrap the original thread pool, passing in the default exception handling mechanism
ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService, Throwable::printStackTrace);
// Use a common exception handling mechanism
executorServiceWrapper.submit(() -> {
int i = 1 / 0;
});
// Pass in a custom exception handling mechanism
executorServiceWrapper.submit(() -> {
int i = 1 / 0;
}, throwable -> {
// Prints the exception information
System.err.println("customer---" + throwable.getMessage());
});
}
Copy the code
Output: It can be seen that the two tasks in the occurrence of an error, according to the specific exception handling mechanism.
java.lang.ArithmeticException: / by zero at org.ywb.practise.difficulty.ExecutorMain.lambda$test1$2(ExecutorMain.java:43) at org.ywb.practise.difficulty.ExecutorServiceWrapper.lambda$submit$0(ExecutorServiceWrapper.java:30) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) customer---/ by zeroCopy the code
Further reading, other frameworks support thread pool exception handling
If the business is relatively simple, I can use the above operations. However, if the business requires more and I want more support, I can use the tools provided by Guava below.
Guava
- Introduction of depend on
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0 the jre</version>
</dependency>
Copy the code
- Using the demonstration
@Test
public void test(a) {
// Wrap the thread pool
ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
// Wrap the task
ListenableFutureTask<Void> listenableFutureTask = ListenableFutureTask.create(() -> {
int i = 1 / 0;
}, null);
// Add a callback to the task
Futures.addCallback(listenableFutureTask, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
// Callback after success
System.out.println("success");
}
@Override
public void onFailure(Throwable t) {
// Exception handling
t.printStackTrace();
}
}, guavaExecutor);
// Submit the task
guavaExecutor.submit(listenableFutureTask);
}
Copy the code
Guava not only handles failures, but also adds follow-up actions to the results of task execution through the OnSuccess method.
netty
Netty’s handling method is awesome, but unfortunately we can’t use it directly. It implements an event-based thread pool used by its own framework. Netty is a great framework, and I hope you can learn it when you have a chance.
public interface Future<V> extends java.util.concurrent.Future<V> {
/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess(a);
/**
* returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
*/
boolean isCancellable(a);
/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause(a);
/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Removes the first occurrence of the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Removes the first occurrence for each of the listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. */
Future<V> sync(a) throws InterruptedException;
/** * Waits for this future until it is done, and rethrows the cause of the failure if this future * failed. */
Future<V> syncUninterruptibly(a);
/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
Future<V> await(a) throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
Future<V> awaitUninterruptibly(a);
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* Return the result without blocking. If the future is not done yet this will return {@code null}.
*
* As it is possible that a {@code null} value is used to mark the future as successful you also need to check
* if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
*/
V getNow(a);
/ * * * {@inheritDoc}
*
* If the cancellation was successful it will fail the future with a {@link CancellationException}.
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
Copy the code