深度解析Java线程池的异常处理

在逛同性交友网站GitHub的时候看到一个解析线程池异常处理的Issue,正好是曾经自己遇到过的问题。在此记录下来,并将其拓展到其他类型的线程池。

本文因篇幅省略了诸多AQS相关知识,可以查看博客中另一篇博文 一行一行源码分析清楚AQS 以保证清楚理解本文。

1、ThreadPoolExecutor

此部分来源于 GitHub aCoder2013

这一线程池由来已久,是抽象类 AbstractExecutorService 继承类,通过调用不同构造函数实现诸如 newFixedThreadPool 、newCachedThreadPool 线程池功能。

问题:

考虑下面这段代码,有什么区别呢?你可以猜猜会不会有异常打出呢?如果打出来的话是在哪里?:

        ExecutorService threadPool = Executors.newFixedThreadPool(1);
        threadPool.submit(() -> {
            Object obj = null;
            System.out.println(obj.toString());
        });
        threadPool.execute(() -> {
            Object obj = null;
            System.out.println(obj.toString());
        });

你会发现单就执行这两句的话,结果只会打印一处异常信息,来源于 execute() 中的 obj.toString() 。

源码解析:

分析下面源码,发现其实重载的 submit() 方法将 Runnable、Callable 都封装成继承 Future 的 RunnableFuture 的实现类 FutureTask 对象(有点忽悠 XD)

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // volatile修饰,保证多线程下的可见性,可以看看Java内存模型
    }
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

接下来就会实际提交到队列中交给线程池调度处理:

 /**
    * 这里经过多次检查验证
    * 意图都是将通过addWorker()或是workQueue.offer()
    * 将Runnable传给继承AQS的Worker内部类进行封装
    * 了解AQS的同学一看便能融会贯通
    */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

那么接下来看看线程池核心的流程:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
          /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //getTask()方法会尝试从队列中抓取数据
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //可覆写此方法打日志埋点之类的
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //简单明了,直接调用run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//下文将介绍可以重写此方法捕获异常
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

submit() 的方式

之前我们知道最终传递过去的是FutureTask,也就是说会调用这里的 Future 的 run方法,我们看看实现:

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {//捕获异常不抛出
                    result = null;
                    ran = false;
                    //。。。
                    setException(ex);//调用此方法设置异常状态
                }
                if (ran)
                    set(result);
            }
        } finally {
          //省略
    }

      protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t; //异常赋给了这个变量
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 将状态CAS为EXCEPTIONAL
            finishCompletion();
        }
    }
    public V get() throws InterruptedException, ExecutionException {//Future.get()获取返回值
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);//最终调用report返回outcome变量
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);//若状态异常则抛出outcome异常
    }

如上面分析到的,这样的话调用 FutureTask.run() 并不会直接抛出异常,所以在 ThreadPool.execute() 中捕获不到异常。但我们可以通过调用 get() 方法来捕捉异常。

submit() 解决方式

1、基本方式 try/catch,直接调用get()

2、重写 protected afterExecute(Runnable r,Throwable t ) { } 方法

想想如果我明明一开始调用的是 submit(Runnable r) ,为了捕捉异常还需刻意调用 get() 未免有点麻烦。Doug Lea 大佬已经在 JDK 文档中教我们可以重写 ThreadPoolExecutor 中的 afterExecute() 方法来实现异常捕获:

    protected void afterExecute(Runnable r, Throwable t) {
          super.afterExecute(r, t);
          if (t == null && r instanceof Future<?>) {
            try {
              //如上文所说,主动调用get() 将异常抛出
              Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {//异常捕获
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
          }
          if (t != null){
            //异常处理
            t.printStackTrace();
          }
        }

execute() 方式:

如代码SrcAnalyse2,此方法不同于submit() 会进行封装成Future ,其传递过去的就直接是Runnable,因此就会直接抛出:

    try {
        task.run();//在此情况直接为Runnable.run()
    } catch (RuntimeException x) {//异常被直接捕获
        thrown = x; throw x;
    } catch (Error x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);
    }

那么这里的异常到底会抛出到哪里呢, 我们看看JVM具体是怎么处理的:

if (!destroy_vm || JDK_Version::is_jdk12x_version()) {
    // JSR-166: change call from from ThreadGroup.uncaughtException to
    // java.lang.Thread.dispatchUncaughtException
    if (uncaught_exception.not_null()) {
      //如果有未捕获的异常
      Handle group(this, java_lang_Thread::threadGroup(threadObj()));
      {
        KlassHandle recvrKlass(THREAD, threadObj->klass());
        CallInfo callinfo;
        KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass());
        /*  
            这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法
            template(dispatchUncaughtException_name,            "dispatchUncaughtException")                
        */
        LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass,
                                           vmSymbols::dispatchUncaughtException_name(),
                                           vmSymbols::throwable_void_signature(),
                                           KlassHandle(), false, false, THREAD);
        CLEAR_PENDING_EXCEPTION;
        methodHandle method = callinfo.selected_method();
        if (method.not_null()) {
          JavaValue result(T_VOID);
          JavaCalls::call_virtual(&result,
                                  threadObj, thread_klass,
                                  vmSymbols::dispatchUncaughtException_name(),
                                  vmSymbols::throwable_void_signature(),
                                  uncaught_exception,
                                  THREAD);
        } else {
          KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());
          JavaValue result(T_VOID);
          JavaCalls::call_virtual(&result,
                                  group, thread_group,
                                  vmSymbols::uncaughtException_name(),
                                  vmSymbols::thread_throwable_void_signature(),
                                  threadObj,           // Arg 1
                                  uncaught_exception,  // Arg 2
                                  THREAD);
        }
        if (HAS_PENDING_EXCEPTION) {
          ResourceMark rm(this);
          jio_fprintf(defaultStream::error_stream(),
                "\nException: %s thrown from the UncaughtExceptionHandler"
                " in thread \"%s\"\n",
                pending_exception()->klass()->external_name(),
                get_thread_name());
          CLEAR_PENDING_EXCEPTION;
        }
      }
    }

可以看到这里最终会去调用Thread#dispatchUncaughtException方法:

    private void dispatchUncaughtException(Throwable e) {
        //默认会调用ThreadGroup的实现
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }
​

    public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                //可以看到会打到System.err里面
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

execute() 解决方式

1、基本方式,直接try/catch

2、线程重写 setUncaughtExceptionHandler() 方法

       Thread t = new Thread();
       t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

           public void uncaughtException(Thread t, Throwable e) {
              LOGGER.error(t + " throws exception: " + e);
           }
        });
        //如果是线程池的模式:
        ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(
                (t1, e) -> LOGGER.error(t1 + " throws exception: " + e));
            return t;
        });

2、ForkJoinPool

ForkJoinPool也是继承AbstractExecutorService的线程池,实现了 Fork/Join 及 work-stealing 以提升多核计算效率(详细的 ForkJoin 设计原理可参考三石 道并发编程网 )。
因各版本 JDK 源码略有差异,此处仅为 JDK8 源码分析,且不具体分析线程池运行机制。

JDK源码中给了我们这样一个任务提交总结表

任务执行需求 在非Fork/Join调用 在Fork/Join计算中调用
异步执行无返回值 execute(ForkJoinTask) ForkJoinTask.fork()
等待获取返回值 invoke(ForkJoinTask) ForkJoinTask.invoke()
异步执行并获取Future submit(ForkJoinTask) ForkJoinTask.fork()
  1. 非Fork/Join 和在 Fork/Join 中是指调用提交时所在的位置,前者为外部(main)通过 ForkJoinPool 本身提交 ForkJoinTask ,后者为 ForkJoinTask 执行中再将子任务提交到 ForkJoinPool 中。
  2. ForkJoinTask fork 方法返回this,而 ForkJoinTask 本身实现 Future ,则第三种中可使用 submit(ForkJoinTask).get()ForkJoinTask.fork().get()获取返回值,将 get 方法换成 join 也同样可行,区别只在于异常处理方式。
  3. 调用 execute(Runnable)submit(Runnable)submit(Callable)将通过 ForkJoinTask 的子类进行适配。
  4. 此外还有 invokeAll(Collection <Callable>) 等方法,类似此处就不讨论。
//ForkJoinPool
public <T> T invoke(ForkJoinTask<T> task) {} //提交并等待返回值
public void execute(ForkJoinTask<?> task) {} //提交,异步执行
public void execute(Runnable task) {}
//提交并异步执行,返回ForkJoinTask
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}
public <T> ForkJoinTask<T> submit(Callable<T> task) {}
public <T> ForkJoinTask<T> submit(Runnable task, T result) {}
public ForkJoinTask<?> submit(Runnable task) {}
//ForkJoinTask
public final ForkJoinTask<V> fork() {} // 提交并异步执行
public final V invoke() {} //提交并等待返回值

跟踪捕获

ForkJoinPool 中三种类型的提交方法都将调用 externalPush(ForkJoinTask)externalSubmit(ForkJoinTask)将参数封装成 ForkJoinTask 并 Push 到 已存在或初始化的 WorkQueue 中

我们绕过具体的任务调度、状态转换直接从执行 ForkJoinTask 的 ForkJoinWokerThread 来跟踪执行异常的处理 (每个这样的线程对象都在 ForkJoinPool 中有一对应的 WorkQueue)

线程run方法: ForkJoinWorkerThread.run()

    public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart(); // 空方法
            // 调用 ForkJoinPool 中方法开始执行
            pool.runWorker(workQueue); 
             } catch (Throwable ex) {  // 此处捕获抛出的异常,最后将介绍
             exception = ex;
        } finally {
             try {
                    onTermination(exception); // 自定义收尾函数
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception); // 自定义异常处理,最后介绍
                }
        }
    }
}

线程执行循环: ForkJoinPool.runWorker(WorkQueue)

final void runWorker(WorkQueue w) {
    w.growArray();                   // 给 WorkQueue 分配任务数组
    int seed = w.hint;               // 初始化随机数
    int r = (seed == 0) ? 1 : seed;  
    for (ForkJoinTask<?> t;;) {
        if ((t = scan(w, r)) != null) // 实现 work-stealing 
            w.runTask(t); // 调用 WorkQueue 方法执行任务 t
            // 等待任务进入下一循环,若返回false,则退出循环线程进入结束
        else if (!awaitWork(w, r)) 
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // 可先忽略
    }
}

调用 WorkQueue 执行: WorkQueue.runTask(ForkJoinTask)

final void runTask(ForkJoinTask<?> task) {
        if (task != null) {
            scanState &= ~SCANNING;
            // 调用 ForkJoinTask.doExec() 直接执行任务,此处未捕获或抛出
            (currentSteal = task).doExec();  
            U.putOrderedObject(this, QCURRENTSTEAL, null); // 
            execLocalTasks(); // 继续执行 WorkQueue 中任务数组中任务
            ForkJoinWorkerThread thread = owner;
            if (++nsteals < 0)   
                transferStealCount(pool);
            scanState |= SCANNING;
            if (thread != null)
                thread.afterTopLevelExec();
        }
    }

最终调用任务执行方法: ForkJoinTask.doExec()

final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) { // 任务状态判断,volatile保证可见性
        try {
            completed = exec(); // exec() 为抽象方法以自定义任务执行体
        } catch (Throwable rex) { // 在此捕获所有任务内容抛出的异常
            // 修改状态,并记录异常
            return setExceptionalCompletion(rex); 
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

修改任务状态并记录异常: setExceptionalCompletion(Throwable)

private int setExceptionalCompletion(Throwable ex) {
    // 使用 Static+弱引用类型+类HashTable结构 变量记录全局异常
    // 并设置状态为 EXCEPTIONAL ,下文详解
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
    // 自定义异常传播(权限只在此并发包内)
        internalPropagateException(ex);
    return s;
}

抛出异常

ExceptionNode[ ] 实现类似 HashTable 以记录异常及异常的 ForkJoinTask。为什么使用弱引用呢?可以想象当分出的诸多子任务中一个子任务异常,势必造成之上的父任务异常,以此循环向上将造成大量异常对象,全部存储将有损效率。所以采用弱应用,当子任务的异常在父任务执行体中因执行 get、join、invoke 而抛出并被我们上面分析到的过程捕获,异常将被记录为父任务的异常并存储,此时子任务异常已没有意义子任务也无引用,使用弱引用可借用GC帮助清除

static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
    final Throwable ex;
    ExceptionNode next;  // 虽然设计成数组链表结构,可源码中并没以此存储
    final long thrower;  // 用id代替引用,避免引用
    final int hashCode;  // 在弱引用消失前记录ForkJoinTask 的 HashCode
    ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
        super(task, exceptionTableRefQueue);
        this.ex = ex;
        this.next = next;
        this.thrower = Thread.currentThread().getId();
        this.hashCode = System.identityHashCode(task);
    }
}

既然已经记录下异常,那么怎样会抛出这些异常呢?在 ForkJoinTask 中提供方法获取

private Throwable getThrowableException() {
    if ((status & DONE_MASK) != EXCEPTIONAL)
        return null;
    int h = System.identityHashCode(this);//获取HashCode
    ExceptionNode e;
    final ReentrantLock lock = exceptionTableLock;
    lock.lock();
    try {
        // 通过ReferenceQueue清除已无意义ExceptionNode
        expungeStaleExceptions();
        //exptionTable是static ExceptionNode[]存储全局异常
        ExceptionNode[] t = exceptionTable;
        e = t[h & (t.length - 1)];//以HashCode求数组下标
        while (e != null && e.get() != this)
            e = e.next;//得到对于ExceptionNode
    } finally {
        lock.unlock();
    }
    Throwable ex;
    if (e == null || (ex = e.ex) == null)//从ExceptionNode中取异常
        return null;
    // 如果出错线程不是当前线程,则在构造此异常以便在提供准确的异常堆栈轨迹
    if (e.thrower != Thread.currentThread().getId()) {
        Class<? extends Throwable> ec = ex.getClass();
        try {
        // ...........
        } catch (Exception ignore) {
        }
    }
    return ex;
}

上面已提到 ForkJoinTask 中调用到此方法获取异常的有 get、join、invoke、invokeAll
其中的 join、invoke、invokeAll 采用如下抛出 Unchecked 异常,而JDK源码示例中也推荐在任务体中使用这几种

static void rethrow(Throwable ex) {
    if (ex != null)
        ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
@SuppressWarnings("unchecked") static <T extends Throwable>
    void uncheckedThrow(Throwable t) throws T {
    throw (T)t; 
}

而 get 因实现自 Future 而将异常封装成统一的 ExecutionException

public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        throw new ExecutionException(ex);//封装成统一的Checked异常
    return getRawResult();
}

因此只要在执行体中进行合适 Fork/Join 任务分解,并在非 Fork/Join 中给 ForkJoinPool 提交后调用提交或返回的 ForkJoinTask 中 get、join、invoke 方法(即使无返回值也可以调用来抛出),再加以 try/catch 就能捕获到任务体执行过程中的异常。

拓展

要是我通过 ForkJoinPool.execute(Runnable) 提交的任务,本身就没有 ForkJoinTask 且此方法也没有返回 ForkJoinTask ,这种情况将如何捕获异常呢?

其实此方法将 Runnable 封装为 ForkJoinTask 的子类,此子类拓展了上面提到的 internalPropagateException(Throwable) 异常传播方法,并在其中抛出异常被在上文介绍的 ForkJoinWorkerThread.run()这个方法捕获,并调用 ForkJoinWorkerThread 中 onTermination(Throwable exception) 收尾函数,最终都将调用 ForkJoinPool.deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) 并在其中抛出。

try {
     onTermination(exception); // 可添加自定义处理,但不做捕获
     } catch (Throwable ex) {
         if (exception == null)
             exception = ex;
     } finally {
         pool.deregisterWorker(this, exception);// 最终都将抛出异常
     }

没人管了?肯定不会的。

第一部分讲到的 Thread.UncaughtExceptionHandler 在这里也同样起作用,给 ForkJoinPool 构造函数传入UncaughtExceptionHandler,则所有 ForkJoinWorkerThread 都将被设置成此以处理异常(设计得就是这么周到 XD)。当然你要不嫌麻烦选择自定义 ForkJoinWorkerThreadFactory 也是没问题的

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
觉得不错不妨打赏一笔