【问题标题】:Why cannot `ExecutorService` consistently schedule threads?为什么 ExecutorService 不能一致地调度线程?
【发布时间】:2016-07-10 23:03:37
【问题描述】:

我正在尝试使用CyclicBarrier 重新实现我的并发代码,这对我来说是新的。我可以不用它,但我正在对我的其他解决方案进行试验,我遇到的问题是使用以下代码出现死锁情况:

//instance variables (fully initialised elsewhere).
private final ExecutorService exec = Executors.newFixedThreadPool(4);
private ArrayList<IListener> listeners = new ArrayList<IListener>();
private int[] playerIds;

private class WorldUpdater {
    final CyclicBarrier barrier1;
    final CyclicBarrier barrier2;
    volatile boolean anyChange;
    List<Callable<Void>> calls = new ArrayList<Callable<Void>>();

    class SyncedCallable implements Callable<Void> {
        final IListener listener;

        private SyncedCallable(IListener listener) {
            this.listener = listener;
        }

        @Override
        public Void call() throws Exception {
            listener.startUpdate();
            if (barrier1.await() == 0) {
                anyChange = processCommons();
            }
            barrier2.await();
            listener.endUpdate(anyChange);
            return null;
        }
    }

    public WorldUpdater(ArrayList<IListener> listeners, int[] playerIds) {
        barrier2 = new CyclicBarrier(listeners.size());
        barrier1 = new CyclicBarrier(listeners.size());
        for (int i : playerIds)
            calls.add(new SyncedCallable(listeners.get(i)));
    }

    void start(){
        try {
            exec.invokeAll(calls);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


void someMethodCalledEveryFrame() {
    //Calls some Fisher-something method that shuffles int[]
    shufflePIDs();  
    WorldUpdater updater = new WorldUpdater(listeners, playerIds);
    updater.start();
}

我使用 Android Studio (intelliJ) 中的调试器在此阶段暂停执行。我有多个线程显示我的 await 调用是我要执行的最后一个代码

->Unsafe.park

->LockSupport.park

->AbstractQueuedSynchronizer$ConditionObject.await

->CyclicBarrier.doWait

->CyclicBarrier.await

至少一个线程将拥有这个堆栈:

->Unsafe.park.

->LockSupport.park

->AbstractQueuedSynchronizer$ConditionObject.await

->LinkedBlockingQueue.take

->ThreadPoolExecutor.getTask

->ThreadPoolExecutor.runWorker

->ThreadPoolExecutor$Worker.run

->Thread.run

我注意到CyclicBarrier 在后面的这些杂散线程中没有任何作用。

processCommons is 调用exec.invokeAll(在 3 个侦听器上),我想这意味着我的线程用完了。但是很多时候这不会发生,所以请有人澄清为什么ExecutorService 不能始终如一地安排我的线程?他们有自己的堆栈和程序计数器,所以我认为这不是问题。我一次最多只能运行 4 个。有人帮我算算吗?

【问题讨论】:

    标签: java multithreading synchronization cyclicbarrier


    【解决方案1】:

    创建WorldUpdaterlisteners.size() 的值是多少?如果超过四个,那么您的线程将永远无法越过障碍。

    您的ExecutorService正好四个线程。不多也不少。 barrier1.await()barrier2.await() 的调用者在 listeners.size() 线程正在等待之前不会越过障碍。

    我的直觉是,池线程使用CyclicBarrier 是错误的。 CyclicBarrier 仅在您确切知道有多少线程将使用它时才有用。但是,当您使用线程池时,您通常知道池的大小。事实上,在现实世界(即商业)应用程序中,如果您使用的是线程池,它可能根本不是由您的代码创建的。它可能是在其他地方创建的,并作为注入的依赖项传递到您的代码中。

    【讨论】:

    • 我最初批准这个是因为您强调“正好四个”,“不多不少”,显得深刻。我现在意识到这是错误的:like 我尝试使用CyclicBarrier。它只是不适合这里,所以谢谢你的指点。
    【解决方案2】:

    我做了一个小实验并想出了:

            @Override
            public Void call() throws Exception {
                System.out.println("startUpdate, Thread:" + Thread.currentThread());
                listener.startUpdate();
                if (barrier1.await() == 0) {
                    System.out.println("processCommons, Thread:" + Thread.currentThread());
                    anyChange = processCommons();
                }
                barrier2.await();
                System.out.println("endUpdate, Thread:" + Thread.currentThread());
                listener.endUpdate(anyChange);
                return null;
            }
    

    当使用 3 个池和 3 个 listeners 时,我会一直挂在 processCommons 中,其中包含以下内容:

        List<Callable<Void>> calls = new ArrayList<Callable<Void>>();
        for (IListener listiner : listeners)
            calls.add(new CommonsCallable(listener));
        try {
            exec.invokeAll(calls);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    

    有 2 个线程在屏障处等待,第三个线程试图再创建 3 个。我在ExecutorService 中需要一个额外的线程,并且屏障处的 2 可以像我在问题中所问的那样被“回收”。我在这个阶段引用了 6 个线程,而 exec 只持有 4 个。这可以愉快地运行很多分钟。

    private final ExecutorService exec = Executors.newFixedThreadPool(8);
    

    应该更好,但事实并非如此。

    最后我在 intelliJ 中做了断点步进(感谢ideaC!)

    问题是

            if (barrier1.await() == 0) {
                anyChange = processCommons();
            }
            barrier2.await();
    

    在 2 个await 之间,您可能会得到几个尚未实际到达await 的挂起线程。对于 4 个听众中的 3 个听众,只需要一个听众就可以“计划外”(或其他),而barrier2 永远不会得到完整的补充。但是当我有 8 个池时呢?除了两个线程之外的所有线程都表现出相同的行为:

    ->Unsafe.park.

    ->LockSupport.park

    ->AbstractQueuedSynchronizer$ConditionObject.await

    ->LinkedBlockingQueue.take

    ->ThreadPoolExecutor.getTask

    ->ThreadPoolExecutor.runWorker

    ->ThreadPoolExecutor$Worker.run

    ->Thread.run

    这里会发生什么来禁用所有 5 个线程?我应该听从 James Large 的建议,避免在这个过于复杂的 CyclicBarrier 中使用撬棍。--更新--它 现在可以在没有 CyclicBarrier 的情况下运行一整夜。

    【讨论】:

      猜你喜欢
      • 2015-06-22
      • 2014-10-03
      • 2010-12-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-11-01
      • 2016-02-01
      • 2019-01-02
      相关资源
      最近更新 更多