最近项目中一些异步执行的逻辑没有运行异常却没有打出日志 给定位问题带来麻烦??
问题分析
接下来我们来看一下java中的线程池是如何运行我们提交的任务的,详细流程比较复杂,这里我们不关注,我们只关注任务执行的部分。java中的线程池用的是ThreadPoolExecutor,真正执行代码的部分是runWorker方法:final void runWorker(Worker w)
//省略无关部分
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //执行程序逻辑
} catch (RuntimeException x) { //捕获RuntimeException
thrown = x; throw x;
} catch (Error x) { //捕获Error
thrown = x; throw x;
} catch (Throwable x) { //捕获Throwable
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //运行完成,进行后续处理
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
//省略无关部分
可以看到,程序会捕获包括Error在内的所有异常,并且在程序最后,将出现过的异常和当前任务传递给afterExecute方法。
而ThreadPoolExecutor中的afterExecute方法是没有任何实现的:
protected void afterExecute(Runnable r, Throwable t) { }
也就是说,默认情况下,线程池会捕获任务抛出的所有异常,但是不做任何处理。
存在问题
想象下ThreadPoolExecutor这种处理方式会有什么问题?
这样做能够保证我们提交的任务抛出了异常不会影响其他任务的执行,同时也不会对用来执行该任务的线程产生任何影响。
问题就在afterExecute
方法上,这个方法没有做任何处理,所以如果我们的任务抛出了异常,我们也无法立刻感知到。即使感知到了,也无法查看异常信息。
所以,作为一名好的开发者,是不应该允许这种情况出现的。
如何避免这种问题
思路很简单。
1. 在提交的任务中将异常捕获并处理,不抛给线程池。
2. 异常抛给线程池,但是我们要及时处理抛出的异常。
第一种思路很简单,就是我们提交任务的时候,将所有可能的异常都Catch住,并且自己处理,任务的大致代码如下:
@Override
public void run() {
try {
//处理所有的业务逻辑
} catch (Throwable e) {
//打印日志等
} finally {
//其他处理
}
}
说白了就是把业务逻辑都trycatch起来。
但是这种思路的缺点就是:1)所有的不同任务类型都要trycatch,增加了代码量。2)不存在checkedexception的地方也需要都trycatch起来,代码丑陋。
第二种思路就可以避免上面的两个问题。
第二种思路又有以下几种实现方式:
1. 自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)
方法。
2. 实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);
方法,并将该handler传递给线程池的ThreadFactory
3. 采用Future模式,将返回结果以及异常放到Future中,在Future中处理
4. 继承ThreadGroup,覆盖其uncaughtException方法。(与第二种方式类似,因为ThreadGroup类本身就实现了Thread.UncaughtExceptionHandler接口)
下面是以上几种方式的代码
方式1
自定义线程池:
final class PoolService {
// The values have been hard-coded for brevity
ExecutorService pool = new CustomThreadPoolExecutor(
10, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
// ...
}
class CustomThreadPoolExecutor extends ThreadPoolExecutor {
// ... Constructor ...
public CustomThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
// Exception occurred, forward to handler
}
// ... Perform task-specific cleanup actions
}
@Override
public void terminated() {
super.terminated();
// ... Perform final clean-up actions
}
}
方式2
实现Thread.UncaughtExceptionHandler接口,实现void uncaughtException(Thread t, Throwable e);
方法,并将该handler传递给线程池的ThreadFactory
final class PoolService {
private static final ThreadFactory factory =
new ExceptionThreadFactory(new MyExceptionHandler());
private static final ExecutorService pool =
Executors.newFixedThreadPool(10, factory);
public void doSomething() {
pool.execute(new Task()); // Task is a runnable class
}
public static class ExceptionThreadFactory implements ThreadFactory {
private static final ThreadFactory defaultFactory =
Executors.defaultThreadFactory();
private final Thread.UncaughtExceptionHandler handler;
public ExceptionThreadFactory(
Thread.UncaughtExceptionHandler handler)
{
this.handler = handler;
}
@Override public Thread newThread(Runnable run) {
Thread thread = defaultFactory.newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
}
}
public static class MyExceptionHandler extends ExceptionReporter
implements Thread.UncaughtExceptionHandler {
// ...
@Override public void uncaughtException(Thread thread, Throwable t) {
// Recovery or logging code
}
}
}
方式3
继承ThreadGroup,覆盖其uncaughtException方法
public class ThreadGroupExample {
public static class MyThreadGroup extends ThreadGroup {
public MyThreadGroup(String s) {
super(s);
}
public void uncaughtException(Thread thread, Throwable throwable) {
System.out.println("Thread " + thread.getName()
+ " died, exception was: ");
throwable.printStackTrace();
}
}
public static ThreadGroup workerThreads =
new MyThreadGroup("Worker Threads");
public static class WorkerThread extends Thread {
public WorkerThread(String s) {
super(workerThreads, s);
}
public void run() {
throw new RuntimeException();
}
}
public static void main(String[] args) {
Thread t = new WorkerThread("Worker Thread");
t.start();
}
}
确实这种方式与上面通过ThreadFactory来指定UncaughtExceptionHandler是一样的,只是代码逻辑不同,但原理上都是一样的,即给线程池中的每个线程都指定一个UncaughtExceptionHandler。
** 注意:上面三种方式针对的都是通过execute(xx)的方式提交任务,如果你提交任务用的是submit()方法,那么上面的三种方式都将不起作用,而应该使用下面的方式 **
方式4
如果提交任务的时候使用的方法是submit,那么该方法将返回一个Future对象,所有的异常以及处理结果都可以通过future对象获取。
采用Future模式,将返回结果以及异常放到Future中,在Future中处理
final class PoolService {
private final ExecutorService pool = Executors.newFixedThreadPool(10);
public void doSomething() {
Future<?> future = pool.submit(new Task());
// ...
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Reset interrupted status
} catch (ExecutionException e) {
Throwable exception = e.getCause();
// Forward to exception reporter
}
}
}
总结
- java线程池会捕获任务抛出的异常和错误,但不做任何处理
- 好的程序设计应该考虑到对于类异常的处理
- 处理线程池中的异常有两种思路:
1)提交到线程池中的任务自己捕获异常并处理,不抛给线程池
2)由线程池统一处理 - 对于execute方法提交的线程,有两种处理方式
1)自定义线程池并实现afterExecute方法
2)给线程池中的每个线程指定一个UncaughtExceptionHandler,由handler来统一处理异常。 - 对于submit方法提交的任务,异常处理是通过返回的Future对象进行的。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/15094.html