【问题标题】:Interrupt parallel Stream execution中断并行流执行
【发布时间】:2014-06-16 12:18:25
【问题描述】:

考虑这段代码:

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

tasks 是应并行执行的 Runnable 列表。 当我们启动这个线程并开始执行时,根据一些计算,我们需要中断(取消)所有这些任务。

中断线程只会停止其中一个执行。我们如何处理他人?或者也许不应该那样使用 Streams?或者您知道更好的解决方案?

【问题讨论】:

  • 我不明白。 ExecutorService.invokeAll 有一个明确的方法可以取消创建的任务。你为什么坚持使用不打算提供这种功能的东西?在tasks.parallelStream().forEach(Runnable::run) 中没有什么比在executor.invokeAll(tasks) 中更具可读性了
  • 因为运行它的代码,有时需要通过使用 tasks.stream() 而不是并行流来执行同步任务,我现在唯一使用流是它们更具可读性和简单性理解代码审查的人......但可能你是对的,这些不打算以我使用它的方式使用。
  • 您可以使用单线程ExecutorService,甚至可以实现就地运行ExecutorService,这在继承AbstractExecutorService 时非常容易。这样您就可以免费获得取消支持的顺序执行...

标签: multithreading concurrency java-8 java-stream


【解决方案1】:

您可以使用ForkJoinPool 来中断线程:

@Test
public void testInterruptParallelStream() throws Exception {
    final AtomicReference<InterruptedException> exc = new AtomicReference<>();

    final ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // use the pool with a parallel stream to execute some tasks
    forkJoinPool.submit(() -> {
        Stream.generate(Object::new).parallel().forEach(obj -> {
            synchronized (obj) {
                try {
                    // task that is blocking
                    obj.wait();
                } catch (final InterruptedException e) {
                    exc.set(e);
                }
            }
        });
    });

    // wait until the stream got started
    Threads.sleep(500);
    // now we want to interrupt the task execution
    forkJoinPool.shutdownNow();
    // wait for the interrupt to occur
    Threads.sleep(500);
    // check that we really got an interruption in the parallel stream threads
    assertTrue(exc.get() instanceof InterruptedException);
}

工作线程确实会被中断,从而终止阻塞操作。您也可以在Consumer 内拨打shutdown()

请注意,这些睡眠可能不会针对适当的单元测试进行调整,您可能有更好的想法,只是在必要时等待。但这足以表明它正在工作。

【讨论】:

  • 我检查了这个解决方案是否有效,但每次都创建和停止新的线程池似乎是个坏主意,对吧?有更好的解决方案吗?我曾尝试像这样使用 Future#cancel Future> task = forkJoinPool.submit(() -> { // 此处的并行流 });任务.取消(真);但这由于某种原因不起作用
  • @VitalyTsvetkoff 如果您打算中断并行执行任务的特定执行,那么您不希望该进程与任何其他线程或池耦合,因此创建自己的ForkJoinPool 是确保您没有副作用的最佳方法。如果执行所需的时间不足以证明创建您自己的池的合理性,那么对于应该提高性能的并行流来说,这可能不是一个好的用例
【解决方案2】:

您实际上并没有在您正在创建的线程上运行 Runnables。您正在运行一个将提交到池的线程,因此:

Thread thread = new Thread(() -&gt; tasks.parallelStream().forEach(Runnable::run));

在这个例子中,你在做的事情较少

List<Runnable> tasks = ...;
Thread thread = new Thread(new Runnable(){
    public void run(){
       for(Runnable r : tasks){
          ForkJoinPool.commonPool().submit(r);
       }
    }
});

这是因为您在处理并行执行时使用了一个parallelStream,它委托给一个公共池。

据我所知,您无法使用parallelStream 获取正在执行您的任务的线程的句柄,因此可能不走运。你总是可以做一些棘手的事情来获得线索,但这可能不是最好的主意。

【讨论】:

    【解决方案3】:

    以下内容应该适合您:

    AtomicBoolean shouldCancel = new AtomicBoolean();
    ...
    tasks.parallelStream().allMatch(task->{
        task.run();
        return !shouldCancel.get();
    });
    

    allMatch 方法的文档特别指出它“如果不需要确定结果,它可能不会评估所有元素的谓词”。因此,如果您想取消时谓词不匹配,那么它就不需要再评估了。另外,您可以查看返回结果,看看循环是否被取消。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-10-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-15
      • 1970-01-01
      相关资源
      最近更新 更多