【问题标题】:Execute competing calculations in parallel and discard all but the first one that finishes并行执行竞争计算并丢弃除第一个完成的计算之外的所有计算
【发布时间】:2014-11-22 07:52:16
【问题描述】:

我写了一个基于随机性生成迷宫的函数。大多数时候,这个功能非常快。但是每隔一段时间,由于随机数的运气不好,需要几秒钟。

我想并行启动这个函数多次,让最快的函数“获胜”。

Scala 标准库(或 Java 标准库)是否为这项工作提供了合适的工具?

【问题讨论】:

  • 找出程序有时需要几秒钟并修复它的原因可能更容易(也更有效)...
  • 你可能想要ExecutorCompletionService,但我同意@immibis
  • @immibis 看来迷宫生成(没有死胡同)只是一个难题,我找到/能想到的每一个解决方案都需要大量的回溯。

标签: java multithreading scala concurrency java.util.concurrent


【解决方案1】:

7 年后,我偶然发现了 ExecutorService.invokeAny,这正是我想要的:

执行给定的任务,返回已成功完成的任务的结果(即,不抛出异常),如果有的话(在给定的超时时间过去之前)。

正常或异常返回时,取消未完成的任务。

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

实现草图:

class LabyrinthGenerator implements Callable<Labyrinth> {
    private static final int N = 16;
    private static final ExecutorService pool = Executors.newFixedThreadPool(N);

    public static Labyrinth generate() {
        while (true) {
            List<LabyrinthGenerator> generators = Stream.generate(LabyrinthGenerator::new)
                                                        .limit(N)
                                                        .collect(Collectors.toList());
            try {
                return pool.invokeAny(generators, 1, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException ex) {
                // all attempts failed or timed out, try again
            }
        }
    }

    @Override
    public Labyrinth call() throws Exception {
        // ...
        if (Thread.currentThread().isInterrupted()) {
            // some other generator found a result, abort
        }
        // ...
    }
}

【讨论】:

  • 请注意,其他 3 个任务将继续运行,直到完成。
【解决方案2】:

你可以使用Future:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val futures = for (_ <- 1 to 4) yield Future { /* computation */ }
val resultFuture = Future.firstCompletedOf(futures)

如果你想阻止(我想你会这样做),你可以使用Await.result

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val result = Await.result(resultFuture, Duration.Inf)

【讨论】:

【解决方案3】:

带有CompletableFuture的java 8解决方案:

public class FirstDoneWithCompletableFutureEx {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        int jobs = 10;
        CompletableFuture<?>[] futures = new CompletableFuture[jobs];
        for (int i = 0; i < jobs; i++) {
            futures[i] = CompletableFuture.supplyAsync(() -> {
                //computation    
                return new Object();
            });
        }

        //first job done
        Object firstDone = CompletableFuture.anyOf(futures).get();
    }
}

带有CompletionService的java 5,6,7解决方案:

public class FirstDoneWithCompletionServiceEx {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        int jobs = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(jobs);
        CompletionService<Object> completionService = new ExecutorCompletionService<>(executorService);

        for (int i = 0; i < jobs; i++)
            completionService.submit(
                    new Callable<Object>() {
                        @Override
                        public Object call() throws Exception {
                            //computation
                            return new Object();
                        }
                    }
            );

        //get first job done
        Object firstDone = completionService.take().get();

        executorService.shutdownNow();
    }
}

【讨论】:

  • 这听起来像是将 CompletableFuture 与 acceptEither CompletionStage 结合使用的绝佳机会。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-05
  • 2019-02-16
相关资源
最近更新 更多