【问题标题】:How to pause/resume all threads in an ExecutorService in Java?如何在 Java 中暂停/恢复 ExecutorService 中的所有线程?
【发布时间】:2012-04-02 16:07:09
【问题描述】:

我向 Java 中的 executorservice 提交了一堆作业,我想暂时暂停所有这些作业。最好的方法是什么?我该如何恢复?还是我这样做完全错了?我是否应该遵循其他模式来实现我想要实现的目标(即暂停/恢复执行服务的能力)?

【问题讨论】:

  • 您的意思是阻止新作业运行,还是暂停已经运行的作业?
  • 暂停已经运行的作业。暂停/恢复可能会在shutdown 之后调用
  • 在这种情况下,您如何开始工作几乎无关紧要。您需要为暂停编写代码 - 例如,每个任务可能想要定期检查“我应该暂停”标志。当然,它仍然不会是即时的。
  • 嗯,我想我可以创建自己特殊的Runnables,它可以理解全局暂停/恢复标志。我希望有一些更清洁的方法可以使用我拥有的Futures 列表或通过ExecutorService 本身

标签: java multithreading concurrency parallel-processing executorservice


【解决方案1】:

问题在于 Runnable/Callable 本身需要检查何时暂停/恢复。话虽如此,有很多方法可以做到这一点,这取决于您对如何最好地做到这一点的要求。无论您需要什么解决方案,都可以使等待可中断,以便线程可以干净地关闭。

【讨论】:

    【解决方案2】:

    为了回答我自己的问题,我在ThreadPoolExecutoritself 的javadocs 中找到了PausableThreadPoolExecutor 的示例。这是我使用 Guava 的监视器的版本:

    import com.google.common.util.concurrent.Monitor;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadFactory;
    
    public class PausableExecutor extends ScheduledThreadPoolExecutor {
    
        private boolean isPaused;
    
        private final Monitor monitor = new Monitor();
        private final Monitor.Guard paused = new Monitor.Guard(monitor) {
            @Override
            public boolean isSatisfied() {
                return isPaused;
            }
        };
    
        private final Monitor.Guard notPaused = new Monitor.Guard(monitor) {
            @Override
            public boolean isSatisfied() {
                return !isPaused;
            }
        };
    
        public PausableExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, threadFactory);
        }
    
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            monitor.enterWhenUninterruptibly(notPaused);
            try {
                monitor.waitForUninterruptibly(notPaused);
            } finally {
                monitor.leave();
            }
        }
    
        public void pause() {
            monitor.enterIf(notPaused);
            try {
                isPaused = true;
            } finally {
                monitor.leave();
            }
        }
    
        public void resume() {
            monitor.enterIf(paused);
            try {
                isPaused = false;
            } finally {
                monitor.leave();
            }
        }
    }
    

    【讨论】:

    • 您的解决方案与 javadocs 中的示例存在一些重大差异... (1) 您使用了两个 Guards 而不是 javadocs 中的一个 Condition; (2)你在 if 之外使用了enterIf(这是完全错误的); (3)Monitorleave 使用signal 而不是signalAll(这是这里真正需要的);最后(4)如果您已经基于notPaused 输入了Monitor,为什么还要等待notPaused(离开它)?总而言之,我认为 Monitor 在这里不是一个好选择...
    • 1) 我发现 Guava 的 Monitor/Guard 抽象比 Condition 更清晰。这里只是个人偏好。 2)你的意思是外部尝试而不是外部如果?我使用了 Guava 文档中记录的 Guard 3) 为什么使用 signalAll?这个 Executor 只适用于它包含的线程,如果我们使用 signal 或 signalAll 对它们无关紧要 4) 如果您看到 Monitor 文档 - docs.guava-libraries.googlecode.com/git/javadoc/com/google/… - Google 自己建议使用来分隔监视器,即使一个是与另一个相反的布尔值。
    • 嗨,我使用了相同的 ThreadPoolExecutor 并向其添加了一个可运行线程。但我无法暂停和恢复 Runnable 线程。请您指导我如何实现。
    【解决方案3】:

    我对您接受的答案提出了一些批评,但它们并不是很有建设性......所以这是我的解决方案。我会使用像这样的一个类,然后在我想要暂停功能的任何地方/任何时候调用checkIn。在GitHub 上找到它!

    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Provides a mechanism to pause multiple threads.
     * If wish your thread to participate, then it must regularly check in with an instance of this object.
     * 
     * @author Corin Lawson <corin@phiware.com.au>
     */
    public class Continue {
        private boolean isPaused;
        private ReentrantLock pauseLock = new ReentrantLock();
        private Condition unpaused = pauseLock.newCondition();
    
        public void checkIn() throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.await();
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkInUntil(Date deadline) throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.awaitUntil(deadline);
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkIn(long nanosTimeout) throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.awaitNanos(nanosTimeout);
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkIn(long time, TimeUnit unit) throws InterruptedException {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.await(time, unit);
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public void checkInUninterruptibly() {
            if (isPaused) {
                pauseLock.lock();
                try {
                    while (isPaused)
                        unpaused.awaitUninterruptibly();
                } finally {
                    pauseLock.unlock();
                }
            }
        }
    
        public boolean isPaused() {
            return isPaused;
        }
    
        public void pause() {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
        }
    
        public void resume() {
            pauseLock.lock();
            try {
                if (isPaused) {
                    isPaused = false;
                    unpaused.signalAll();
                }
            } finally {
                pauseLock.unlock();
            }
        }
    }
    

    例如:

    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadFactory;
    
    public class PausableExecutor extends ScheduledThreadPoolExecutor {
        private Continue cont;
    
        public PausableExecutor(int corePoolSize, ThreadFactory threadFactory, Continue c) {
            super(corePoolSize, threadFactory);
            cont = c;
        }
    
        protected void beforeExecute(Thread t, Runnable r) {
            cont.checkIn();
            super.beforeExecute(t, r);
        }
    }
    

    这还有一个额外的好处,即您可以通过一次调用 Continuepause 来暂停多个线程。

    【讨论】:

    • 谢谢,我只是用你的例子来实现这个功能,但我有几个 cmets,beforeExecute 必须捕获 InterruptedException 才能编译。不清楚你不需要继承 ScheduledThreadPoolExecutor,你可以使用我正在使用的 ThreadPoolExecutor。 PausableExcecutor 只会暂停执行已提交但未启动的任务,要暂停已启动的任务,您需要在任务代码本身中调用 checkIn,我为此使用了 checkInInterruptably() 但不确定这是否是个好主意。
    • 感谢分享 - 我尝试过的许多方法中的第一个。
    • boolean isPaused 应该不稳定吗?还是 ReentrantLock 充当内存屏障?我正在考虑例如线程A调用pause()resume(),线程B调用checkIn(),线程C调用isPaused()
    • 您好,我已经尝试使用@pathiikrit 和 Corin 解决方案来使用线程池管理器实现 Runnable 线程的暂停和恢复。但这对我来说根本不起作用。
    • 这会阻止计划的作业执行,但不会阻止暂停的作业在队列中累积。因此,例如,如果您以每秒固定的速率安排了一些事情,然后暂停了 5 秒钟,那么当您取消暂停时,您的 runnable 将触发 5 次。
    【解决方案4】:

    我一直在寻找执行程序中的暂停/恢复功能,但具有等待任何当前正在处理的任务的额外能力。以下是此 SO 中其他出色实现的变体,并添加了等待功能。我正在用单线程在执行器上对其进行测试。所以基本用法是:

    executor.pause();
    executor.await(10000); // blocks till current tasks processing ends
    

    类代码:

    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class PausableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {      
      public boolean isPaused;
      private ReentrantLock pauseLock = new ReentrantLock();
      private Condition unpaused = pauseLock.newCondition();
      private Latch activeTasksLatch = new Latch();
    
      private class Latch {
        private final Object synchObj = new Object();
        private int count;
    
        public boolean awaitZero(long waitMS) throws InterruptedException {
          long startTime = System.currentTimeMillis();
          synchronized (synchObj) {
            while (count > 0) {
              if ( waitMS != 0) {
                synchObj.wait(waitMS);
                long curTime = System.currentTimeMillis();
                if ( (curTime - startTime) > waitMS ) {                
                  return count <= 0;
                }
              }
              else
                synchObj.wait();
            }
            return count <= 0; 
          }
        }
        public void countDown() {
          synchronized (synchObj) {
            if (--count <= 0) {
              // assert count >= 0;              
              synchObj.notifyAll();
            }
          }
        }
        public void countUp() {
          synchronized (synchObj) {
            count++;
          }
        }    
      }
    
      /**
       * Default constructor for a simple fixed threadpool
       */
      public PausableScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
      }
    
      /**
       * Executed before a task is assigned to a thread.
       */
      @Override
      protected void beforeExecute(Thread t, Runnable r) {
        pauseLock.lock();
        try {
          while (isPaused)
            unpaused.await();
        } catch (InterruptedException ie) {
          t.interrupt();
        } finally {
          pauseLock.unlock();
        }
    
        activeTasksLatch.countUp();
        super.beforeExecute(t, r);
      }
    
      @Override
      protected void afterExecute(Runnable r, Throwable t) {
        try {
          super.afterExecute(r, t);
        }
        finally {
          activeTasksLatch.countDown();
        }
      }
    
      /**
       * Pause the threadpool. Running tasks will continue running, but new tasks
       * will not start untill the threadpool is resumed.
       */
      public void pause() {
        pauseLock.lock();
        try {
          isPaused = true;
        } finally {
          pauseLock.unlock();
        }
      }
    
      /**
       * Wait for all active tasks to end.
       */ 
      public boolean await(long timeoutMS) {
        // assert isPaused;
        try {
          return activeTasksLatch.awaitZero(timeoutMS);
        } catch (InterruptedException e) {
          // log e, or rethrow maybe
        }
        return false;
      }
    
      /**
       * Resume the threadpool.
       */
      public void resume() {
        pauseLock.lock();
        try {
          isPaused = false;
          unpaused.signalAll();
        } finally {
          pauseLock.unlock();
        }
      }
    
    }
    

    【讨论】:

    • 这看起来不错。您或任何人是否对此进行过更彻底的测试?是否有任何修订或修复?我现在要使用它,因为它没有引入另一个库。
    • 它在相当大的应用程序中使用,到目前为止没有问题。如果这段代码有什么bug,我也愿意听
    • @marcinj 我正在尝试您的执行程序代码。适用于暂停和恢复。但是我注意到,当我在它暂停时调用 shutDownNow() 时,它会在实际关闭之前恢复并运行一些任务。有什么办法可以防止吗?
    • @ProgrAmmar 我试图用这段代码重现它:melpon.org/wandbox/permlink/XHa9NwmI7n1WAr3F,但我失败了 - 你能看看这是否是导致问题的原因吗?据我了解,应该将“test 4”“test 5”“test 6”写入控制台。这些是不应执行的任务的输出,但现在已写入。
    • @marcinj 我无法让您的链接正常工作。所以我在这里创建了自己的示例:pastebin.com/AY6r1zGD。我从您的代码中创建了一个 FixedThreadPoolExecutor。可以看到,运行的时候,在shutDownNow()之后调用了一些任务。
    【解决方案5】:

    我知道这已经过时了,但我尝试了所有这些答案,但没有一个适用于我试图用可暂停计时器做的事情;一旦恢复(一次全部),它们都会丢弃它按计划执行的所有数据。

    相反,我在 GitHub 上找到了这个 Timer 类* here。这对我来说非常有效。

    *这段代码不是我写的,随便找的。​​p>

    【讨论】:

      猜你喜欢
      • 2012-08-08
      • 1970-01-01
      • 2011-08-02
      • 2015-02-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-13
      • 2012-08-27
      相关资源
      最近更新 更多