【问题标题】:Monitor status of Object pool workers监控对象池工作人员的状态
【发布时间】:2013-02-27 17:00:09
【问题描述】:

我从另一个类中创建了这个线程,用于在完成时读取执行器的状态并在失败时取消其余任务。任务是可运行的

如果看到任何失败,整体状态必须为 1 或失败

final CompletionService completionService = new ExecutorCompletionService(getExecutorService());
final List<Future> futures = new ArrayList<Future>();

    FutureTask<Integer> tasks = new FutureTask<Integer>(new Callable<Integer>() {

        public Integer call() {

            int status = 0;
            boolean fail = false;

            try {
                for (int i = 0; i < 10; i++) {

                    MyRunnable resultObj = null;

                    try {
                        resultObj = (MyRunnable) completionService.take().get();
                    } catch (CancellationException e) {
                        // Skip it ..
                    }

                    if (!fail) {
                        status = resultObj.getStatus();

                        if (status == 1) {
                            fail = true;
                            for (Future future : futures) {
                                if (!future.isCancelled() && !future.isDone())
                                    future.cancel(true); // cancel pending tasks including running tasks 
                            }
                        }
                    }
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }

            return status;
        }

            });

上面的线程已启动 -

ExecutorService pool = Executors.newSingleThreadExecutor();
pool.submit(tasks);

在下面,对象是从池中借用的,这是一个阻塞调用,我将池大小设置为 3 因此,立即创建了 3 个 MyRunnable 工作人员。当每个工人完成后,他们就可以重复使用以服务于其余任务。

for (int i = 0 ; i < 10; i ++;) {

    MyRunnable myRunnable = null;
    myRunnable = (MyRunnable) this.getGenericObjectPool().borrowObject();

    set myRunnable ..

    futures.add(completionService.submit(myRunnable, myRunnable));

}

while (!tasks.isDone()) {

        try {
            Thread.sleep(Global.WaitTime());            
        } catch (InterruptedException iex) {            
        }

}

finalStatus = tasks.get();
pool.shutdown();

GenericObjectPool 配置为重用对象。我通过强制第一个线程失败并将其状态设置为 1 在 IDE 中模拟了一个测试。但是,问题是一旦它被释放,它就被 borrowObject() 重用,并且监控线程看到了状态已设置的更改对象作为由 GenricObjectPool 完成的新对象激活的一部分,返回 0。

所以,我无法从失败的线程中读取状态。 MyRunnable 不是 Callable ,所以我不得不使用 completionService.submit(obj,obj) 来欺骗 Runnable

如果将池大小设置为 10 或更大,则不会发生此问题,因为这样就不会重用任何对象,并且我将成功读取每个对象的状态,但这不是一个选项。

【问题讨论】:

  • 这里似乎缺少一些代码。你是如何强制第一个线程失败的?如果你先提交 watcher 任务,它会阻塞等待剩下的任务,这意味着你需要多个工作线程来执行。
  • 布尔 s= false; if(s) throw new Exception();使用 IDE,我为第一个运行的实例设置 s 为 true 以模拟失败。在打印最终状态之前,我还需要等待所有任务完成。问题是它们是可运行的,并且它们没有对调用者的引用,所以我什至无法从对象池的运行实例中在调用者中设置变量。
  • 而borrowObject() 是一个阻塞调用..
  • 不知道在这里做什么。插入更多日志记录以跟踪执行,也许这会有所帮助。还可以使用 Visualvm 中的线程监视器来检查线程或在 IDE 中挂起它们。

标签: java multithreading threadpool pool concurrent-programming


【解决方案1】:

我为 Runnable 创建了一个 CallableDecorator 来解决这个问题。现在,即使使用 GenericObjectPool,我也有正确的返回值。由于现在不依赖 Pool 对象来读取状态,即使重用对象也不会导致状态重置 -

因此,代码中的 2 处更改 - 更改

futures.add(completionService.submit(myRunnable, myRunnable));

futures.add(completionService.submit(new CallableDecorator(myRunnable)));

添加一个新类

public class CallableDecorator implements Callable {

       IRunnable r;

       public CallableDecorator(IRunnable r) {

           this.r = r;
       }

       public Integer call() {

           r.run();
           return r.statusCode();
       }
}

interface IRunnable extends Runnable {
     public Integer statusCode();
}

同样,resultObj 必须改为整数才能在监控线程中获取它的值。

【讨论】:

    猜你喜欢
    • 2012-07-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-29
    • 1970-01-01
    • 2011-08-29
    • 1970-01-01
    • 2014-12-06
    • 2015-06-22
    相关资源
    最近更新 更多