【问题标题】:Handling Exceptions for ThreadPoolExecutor处理 ThreadPoolExecutor 的异常
【发布时间】:2011-02-03 00:23:28
【问题描述】:

我有以下代码sn-p,它基本上扫描了需要执行的任务列表,然后将每个任务交给执行者执行。

JobExecutor 反过来创建另一个执行程序(用于执行数据库操作...读取和写入数据到队列)并完成任务。

JobExecutor 为提交的任务返回 Future<Boolean>。当其中一项任务失败时,我想优雅地中断所有线程并通过捕获所有异常来关闭执行程序。我需要做哪些更改?

public class DataMovingClass {
    private static final AtomicInteger uniqueId = new AtomicInteger(0);

  private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();   

  ThreadPoolExecutor threadPoolExecutor  = null ;

   private List<Source> sources = new ArrayList<Source>();

    private static class IDGenerator extends ThreadLocal<Integer> {
        @Override
        public Integer get() {
            return uniqueId.incrementAndGet();
        }
  }

  public void init(){

    // load sources list

  }

  public boolean execute() {

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("DataMigration-" + uniqueNumber.get());
                        return t;
                    }// End method
                }, new ThreadPoolExecutor.CallerRunsPolicy());

     List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();

     for (Source source : sources) {
                    result.add(threadPoolExecutor.submit(new JobExecutor(source)));
     }

     for (Future<Boolean> jobDone : result) {
                try {
                    if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
                        // in case of successful DbWriterClass, we don't need to change
                        // it.
                        success = false;
                    }
                } catch (Exception ex) {
                    // handle exceptions
                }
            }

  }

  public class JobExecutor implements Callable<Boolean>  {

        private ThreadPoolExecutor threadPoolExecutor ;
        Source jobSource ;
        public SourceJobExecutor(Source source) {
            this.jobSource = source;
            threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("Job Executor-" + uniqueNumber.get());
                            return t;
                        }// End method
                    }, new ThreadPoolExecutor.CallerRunsPolicy());
        }

        public Boolean call() throws Exception {
            boolean status = true ; 
            System.out.println("Starting Job = " + jobSource.getName());
            try {

                        // do the specified task ; 


            }catch (InterruptedException intrEx) {
                logger.warn("InterruptedException", intrEx);
                status = false ;
            } catch(Exception e) {
                logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
                status = false ;
            }
           System.out.println("Ending Job = " + jobSource.getName());
            return status ;
        }
    }
}   

【问题讨论】:

    标签: java multithreading concurrency exception-handling threadpoolexecutor


    【解决方案1】:

    当您向执行程序提交任务时,它会返回一个FutureTask 实例。

    FutureTask.get() 会将任务抛出的任何异常重新抛出为ExecutorException

    因此,当您遍历 List&lt;Future&gt; 并在每个上调用 get 时,捕获 ExecutorException 并调用有序关闭。

    【讨论】:

    • 好的..您是否发现任何其他缺陷或我需要处理异常的地方?
    【解决方案2】:

    由于您正在向ThreadPoolExecutor 提交任务,因此异常被FutureTask 吞噬。

    看看这个code

    **Inside FutureTask$Sync**
    
    void innerRun() {
        if (!compareAndSetState(READY, RUNNING))
            return;
    
      runner = Thread.currentThread();
        if (getState() == RUNNING) { // recheck after setting thread
            V result;
           try {
                result = callable.call();
            } catch (Throwable ex) {
               setException(ex);
                return;
            }
           set(result);
        } else {
            releaseShared(0); // cancel
        }
    

    }

    protected void setException(Throwable t) {
       sync.innerSetException(t);
    }
    

    从上面的代码中,很明显setException 方法捕获了Throwable。由于这个原因,如果您在ThreadPoolExecutor 上使用“submit()”方法,FutureTask 将吞噬所有异常

    根据javadocumentation,可以在ThreadPoolExecutor中扩展afterExecute()方法

    protected void afterExecute(Runnable r,
                                Throwable t) 
    

    文档中的示例代码:

    class ExtendedExecutor extends ThreadPoolExecutor {
       // ...
       protected void afterExecute(Runnable r, Throwable t) {
         super.afterExecute(r, t);
         if (t == null && r instanceof Future<?>) {
           try {
             Object result = ((Future<?>) r).get();
           } catch (CancellationException ce) {
               t = ce;
           } catch (ExecutionException ee) {
               t = ee.getCause();
           } catch (InterruptedException ie) {
               Thread.currentThread().interrupt(); // ignore/reset
           }
         }
         if (t != null)
           System.out.println(t);
       }
     }
    

    你可以通过三种方式捕捉Exceptions

    1. Future.get() 已接受答案中的建议
    2. 将整个 run()call() 方法包装在 try{}catch{}Exceptoion{} 块中
    3. 如上所示覆盖ThreadPoolExecutor方法的afterExecute

    要优雅地中断其他线程,请看下面的 SE 问题:

    How to stop next thread from running in a ScheduledThreadPoolExecutor

    How to forcefully shutdown java ExecutorService

    【讨论】:

    • 第三种方法afterExecute 适合我,谢谢
    • 这是否仅适用于ThreadPoolExecutor 而不适用于ExecutorService 的任何其他实现?
    • 如果您将此逻辑应用于ScheduledThreadPoolExecutor 并调用scheduleAtFixedRatescheduleWithFixedDelay,那么您将遇到麻烦,因为((Future&lt;?&gt;) r).get(); 将永远不会返回导致池中的线程耗尽。简单的修复可能是将条件更改为 if (t == null &amp;&amp; r instanceof Future&lt;?&gt; &amp;&amp; !r.getClass().getSimpleName().equals("ScheduledFutureTask")) { 。由于ScheduledFutureTask 是一个私有类,你不能用instanceof 查询。
    【解决方案3】:

    子类ThreadPoolExecutor 并覆盖其protected afterExecute (Runnable r, Throwable t) 方法。

    如果您通过java.util.concurrent.Executors 便利类(您不是)创建线程池,请查看其源代码以了解它是如何调用ThreadPoolExecutor

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-06-26
      • 1970-01-01
      • 2011-11-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多