【问题标题】:Program doesn't stop running even though executor was shutdown and futures cancelled即使执行程序已关闭并且期货已取消,程序也不会停止运行
【发布时间】:2020-05-20 00:24:35
【问题描述】:

我编写了一个小代码,用于练习 Executors 和 Threads。它由以下内容组成:

  1. 使用无限队列创建大小为 3 的固定线程池。
  2. 将 3 个无限循环的任务 (while(true)) 提交到池中(然后 所有线程都被占用)
  3. 提交第 4 个任务,该任务将在队列中等待。
  4. executor.shutdown() 并执行 println 以查看我的活动任务和任务计数如何。
  5. 将标志设置为 false 以停止无限 while 然后执行 println 看看我是如何进行活动任务和任务计数的
  6. mayInterruptIfRunning=true 取消所有期货,然后 做一个 println 看看我是如何制作活动任务和任务计数的 有

这是代码:

public class Main {


private static ThreadPoolExecutor fixexThreadPool;

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    final Boolean[] flag = new Boolean[1];
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    for (int i = 0; i < 3; i++) {
        futures.add(fixexThreadPool.submit(() -> {
            int a = 1;
            while (flag[0]) {
                a++;
            }
            System.out.println("Finishing thread execution.");
        }));
    }
    System.out.println("Done submiting 3 threads.");
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(3000L);
    System.out.println("Submitting a 4th thread.");

    futures.add(fixexThreadPool.submit(() -> {
        int a = 1;
        while (flag[0]) {
            a++;
        }
        System.out.println("Finishing thread execution");
    }));

    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));

    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> f.cancel(true));
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}

}

这是执行的输出:

  • 创建大小为 3 的固定线程池和无限队列。
  • 提交 3 个主题
  • 已完成提交 3 个线程。
  • 活动计数:3 |已完成任务计数:0 |任务数:3
  • 提交第 4 个线程。
  • 活动计数:3 |已完成任务计数:0 |任务数:4
  • 执行程序关闭
  • 活动计数:3 |已完成任务计数:0 |任务数:4
  • 将标志设置为 false。
  • 活动计数:3 |已完成任务计数:0 |任务数:4
  • 取消所有期货。
  • 活动计数:3 |已完成任务计数:0 |任务数:4

有几件事我不明白。

  1. 为什么关闭executor后,还有活跃的线程?
  2. 为什么在将标志更改为 false 以中断无限循环后,无限 while 不会中断?
  3. 为什么在取消每一个future之后,还有活跃的线程?
  4. 无论是否将标志更改为 false、关闭执行程序甚至取消所有期货,我的程序都不会停止运行。这是为什么呢?

提前致谢!!

【问题讨论】:

  • 尝试阅读 shutdownshutdownNow 的 javadocs 并了解它们的区别。此外,由于线程正在疯狂循环,它们将消耗最大 cpu。其他指令可能没有机会执行。
  • 嗨@ScaryWombat,我已经改为shutdownNow(),程序继续运行。唯一改变的是第4个排队的任务被删除了。

标签: java java.util.concurrent java-threads


【解决方案1】:

这个问题可以通过使用volatile 关键字来解决。 This thread 提供了大量答案,详细解释了什么是 volatile,并且有大量教程/资源/博客可以提供更多详细信息。另一个关于 volatile here 的超级详细的帖子。

老实说,互联网上有很多人可以比我更好更准确地解释它,但简而言之 - volatile 是一个 Java 修饰符,当您拥有一个由多个共享的资源时应该使用线程。它告诉 JVM 确保每个线程缓存的值与主内存中的值同步。

很明显,JVM 在某个地方发生了故障,线程持有的值与实际值不太匹配。对你的实现做一个小改动就可以解决这个问题:

使标志成为类实例变量

private volatile static Boolean[] flag = new Boolean[1];

请参阅 here 了解我这样做的原因。

所以为了稍微大一点:

private static ThreadPoolExecutor fixexThreadPool;
private volatile static Boolean[] flag = new Boolean[1];

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    ...

代码现在愉快地停止了,没有任何问题,希望这会有所帮助:)

(附带说明,好奇你为什么使用 Boolean[] 而不仅仅是布尔值?我保持原样是为了我的答案保持一致,但它显然也适用于 flag 只是一个布尔值而不是一个数组)

-- 编辑--

回答您最近的评论 - 我很害怕我的理解几乎停止了我已经写的内容,但我可以提供我的想法。似乎可以在ThreadPoolExecutor 的文档中找到当您调用fixexThreadPool.shutdown(); 时您的应用程序没有“退出”的原因。即——

公共无效关机()

启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。

while 循环已经提交,所以它会愉快地继续执行。

我对此进行了一些探索,看看发生了什么。

首先我不喜欢那么长的状态日志行,所以我为它创建了一个单独的方法!我还注意到 ThreadPoolExecutor 和 Future 中的一些有趣的布尔状态,因此决定也记录它们:

private static void Log() {
     System.out.println(String.format("\nActive count: %s | Completed task count: %s | task count: %s", 
             fixexThreadPool.getActiveCount(), 
             fixexThreadPool.getCompletedTaskCount(), 
             fixexThreadPool.getTaskCount()));
     System.out.println(String.format("isShutdown : %s | isTerminated : %s | isTerminating : %s ", 
             fixexThreadPool.isShutdown(), 
             fixexThreadPool.isTerminated(), 
             fixexThreadPool.isTerminating())); 
     System.out.println(String.format("Futures size = %s", futures.size()));
     futures.forEach(f -> System.out.println(String.format("Future isCancelled : %s | isDone : %s", f.isCancelled(), f.isDone())));
     System.out.println("");
}

将其放入您的代码中,我们得到:

    Log();
    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    Log();
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    Log();
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> System.out.println(String.format("Future cancelled - %s", f.cancel(true))));
    Log();

我还想为应用程序添加一个快速的心跳,不时打印一个,这样我就可以看到幕后运行/未运行的内容:

private static void Heartbeat(int a) {
    int amod = a % 1000000000;
    if(amod == 0) {
        System.out.println(a);
    }
}

使用中:

while (flag[0]) {
    a++;
    Heartbeat(a);
}

然后我进行了几次测试,尝试了一些不同的东西,以不同的组合:

  • 注释掉fixexThreadPool.shutdown();
  • 注释掉futures.forEach(f -&gt; f.cancel(true));
  • 注释掉flag[0] = false;
  • 尝试使用/不使用新的volatile 修复。

为了让已经很长的答案更短一点,请随意探索这些组合,但我发现

  • 没有 volatile 关键字,线程卡在terminating 状态。我只能假设shutdown() 不会停止 由于这些任务已经提交,所以从做他们所做的事情开始循环,所以没有一个线程认为 flags[0] == false,他们将继续递增 a。

    据我所知,这展示了文档中概述的行为。所以 shutdown 不会停止你的 while 循环,它只是停止任何新的未来被提交到线程池(并被添加到阻塞队列中),然后耐心地等待 while 循环完成(这显然没有 volatile 标志上的修饰符,他们永远不会)。

  • 使用 volatile,但注释掉关闭,显然,每个任务 终止(控制台记录 4“正在完成线程执行。”)但 线程池保持活动状态,程序本身不会终止。池耐心地等待新任务,显然它永远不会得到。
  • “future.cancel”逻辑目前有点超出我的理解。我运行了一个调用 future.cancel 的测试并进行了一些更深入的日志记录/调查,但并没有真正的具体答案。

    我的运行理论是,future 已经被添加到线程池中的阻塞队列中,因此调用 cancel 不会影响线程池的执行,因此有效地调用 future.cancel 本身绝对可以无需修复exThreadPool。

希望这能提供充足的思考:)

【讨论】:

  • 我使用 Boolean[] 因为 AFAIK 在使用来自 lambda 的局部变量时,lambda 假定它们是有效的 final,然后如果我直接使用 boolean,我将使用 boolean 的副本,如果我改变来自外部的值,lambda 不会注意到变化。那是对的吗。 ?
  • 感谢@Ben 的明确回答。我已将标志更改为 volatile,现在线程正在完成:)。但是,我仍然不明白为什么当我说 executor.shutdown() 和取消所有期货时程序没有完成......
  • 我猜如果在野外循环中有一个小睡眠,那么其他一些线程可能有机会执行。
  • 感谢@Ben 的解释。正如你所说,有些事情还不清楚。
【解决方案2】:

为什么,关闭executor后,还有活跃的线程?

通过调用shutdown,我们请求normal shutdown。线程池停止接受新任务,但等待已提交的任务完成 - 运行或排队。 因此,shutdown 方法不会尝试停止线程。 从概念上讲,ExecutorService 实现将任务提交与任务执行分开。换句话说,我们拥有任务而不是线程。

为什么,为了打破无限,把flag改成false之后 循环,无限while不会中断?

我们在这里使用cancellation flag。但是根据 Java 内存模型,要让多个线程查看对共享数据所做的更改 - flag - 我们必须使用同步或使用 volatile 关键字。因此,虽然同步提供了互斥和内存可见性, volatile 关键字只是提供内存可见性。由于这里只有主线程修改了flag变量,所以将flag定义为静态易失变量会使其按预期工作。

public class Main {


    private static ThreadPoolExecutor fixexThreadPool;
    private static volatile Boolean[] flag = new Boolean[1];
    // the main method
}

为什么,取消每一个future后,都有活跃的线程?

ThreadPoolExecutor 默认使用FutureTask 类作为Future 实现。而FutureTask 通过中断工作线程来取消。所以可以期待thread interruption 停止任务,甚至终止线程。但是线程中断是一种协作机制。首先,任务必须响应中断。它必须使用Thread.isInterrupted 检查中断标志并在可能的情况下退出。其次,线程所有者,在我们的例子中是线程池,必须检查中断状态并采取相应的行动。在此示例中,任务不响应中断。它使用一个标志来取消它的操作。所以让我们继续讨论线程所有者。 ThreadPoolExecutor不作用于中断状态,所以不会终止线程。

无论是否将标志更改为 false,关闭执行程序甚至 取消所有期货,我的程序不会停止运行。这是为什么呢?

如上所述,当前任务使用取消标志的方法。所以使用 volatile 关键字必须解决问题。然后任务将按预期停止。另一方面,取消不会对当前任务产生任何影响。因为它没有检查中断状态。我们可以让它像这样响应:

while (flag[0] && !Thread.currentThread().isInterrupted())

这样,任务也支持线程中断的取消。

【讨论】:

  • 感谢您的回答。超级清楚。你知道有什么书或课程可以让我学习这些关于 Java 线程和执行器的深入和“底层”概念吗?
  • 我很高兴它有帮助。 Brian Goetz 和他的朋友们在“Java 并发实践”中详细介绍了这些主题。
猜你喜欢
  • 2023-01-26
  • 1970-01-01
  • 1970-01-01
  • 2012-10-21
  • 2012-09-12
  • 1970-01-01
  • 1970-01-01
  • 2016-08-28
  • 1970-01-01
相关资源
最近更新 更多