【问题标题】:Java Executor get first result that matches conditionJava Executor 获得第一个匹配条件的结果
【发布时间】:2021-01-10 18:08:52
【问题描述】:

当 Callable 返回的结果与条件匹配时,有没有办法停止 Java ExecuterService?

我有以下算法(代码 A):

MyObject functionA(SomeObject someObject) {
    for(int i = 0; i < 100; ++i) {
        MyObject result = someFunction(i, someObject);
        if (result != null) {
            return result;
        }
    }
    return null;
}

为了加快速度,我将代码更改为(代码 B):

MyObject functionB(SomeObject someObject) {
    final ExecutorService executorService = Executors.newCachedThreadPool();
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        Callable<MyObject> newCallable = () -> {
            return someFunction(finalI, someObject);
        };
        callableList.add(newCallable);
    }
    List<Future<MyObject>> futures = executorService.invokeAll(callableList);
    for(int i = 0; i < futures.size(); ++i) {
        Future<MyObject> future = futures.get(i);
        if(future.get() != null) {
            return future.get();
        }
    }
    return null;
}

平均而言,函数 B 的运行速度是函数 A 的 5 倍(在 8 个内核上)。

这很好,但并不完美。在 functionA 中, someFunction() 在找到结果 != null 之前平均被调用了大约 20 次。在 functionB 中 someFunction() 总是被调用 100 次。

有没有办法

a) 停止 executorService,当第一个线程以结果 != null 结束时

或更好

b) 停止 executorService,当一个线程完成并且一个线程找到结果 != null 并且所有线程的 finalI 值低于找到结果 != null 的线程,都以 reuslt == null 完成

谢谢

------- 编辑 ---------

感谢 Alex Crazy 和 Saxon 的回答,我能够回答我的问题 A。

  1. 结果不符合我的条件的所有 Callables 都应该抛出异常
  2. 我应该使用 invokeAny 而不是 invokeAll
  3. 我应该使用 newFixedThreadPool 而不是 newCachedThreadPool

FixedThreadPool 的使用很重要,因为 CachedThreadPool 并行运行所有 100 个任务。 FixedThreadPool 仅运行您定义的并行任务。新任务只有在旧任务完成后才会开始。

如果您不检查您的可调用对象是否被中断(因为 invokeAny 找到了结果),那么所有可调用对象都将运行直到它们完成。 因此,您可以使用 invokeAny 更快地获得结果,但处理器的工作时间与使用 invokeAll 时一样长。

所以我的新函数如下所示:

MyObject functionC(SomeObject someObject) {
    final ExecutorService executorService = Executors.newFixedThreadPool(8);
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        Callable<MyObject> newCallable = () -> {
            MyObject result = someFunction(finalI, someObject);
            if(result == null) {
                throw new Exception();
            }
            return result;
        };
        callableList.add(newCallable);
    }
    try {
        MyObject result = executorService.invokeAny(callableList);
        return result;
    } catch (InterruptedException | ExecutionException e) {
        return null;
    }
}

functionC 解决了我的问题 A,但它并不总是返回由 someFunction() 创建的 MyObject 对象,该对象具有 finalI 的最低输入值。 例如。 someFunction(3, someObject) 和 someFunction(6, someObject) 将返回一个不为空的 MyObject 对象。
在某些运行中,将返回 someFunction(3, someObject) 的结果,而在其他运行中,将返回 someFunction(6, someObject) 的结果。 它不是确定性的。

所以我写了新的类 ResultCallable

public abstract class ResultCallable<E>  implements Callable<E> {
    E result = null;
}

和功能D

MyObject functionD(SomeObject someObject) {
    final ExecutorService executorService = Executors.newFixedThreadPool(8);
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        ResultCallable<MyObject> newCallable = () -> {
            this.result = someFunction(finalI, someObject);
            if(this.result == null) {
                throw new Exception();
            }
            return this.result;
        };
        callableList.add(newCallable);
    }
    try {
        MyObject result = executorService.invokeAny(callableList);
        executorService.shutdownNow();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        for(int i = 0; i < callableList.size(); ++i) {
            if(callableList.get(i).result != null) {
                return callableList.get(i).result;
            }
        }
    } catch (InterruptedException | ExecutionException e) {
        return null;
    }
    return null;
}

如果someFunction(3, someObject)someFunction(6, someObject) 将返回一个不为空的 MyObject 对象,现在主线程在第一个有效结果后等待所有其他启动的任务完成(使用 newFixedThreadPool(8) 最多有 7 个其他正在运行的任务)。 因为 executorService 按照它们在 callableList 中的顺序运行可调用对象,所以所有 finalI 值低于返回第一个有效结果的可调用对象的可调用对象都已启动,并将在第 19 行之前完成。

这很好,但并不完美。
假设finalI = 6 的任务首先完成,而finalI 从0 到5 和7 的任务仍在运行。 然后因为 executorService.shutdownNow();没有其他任务将启动,主线程将等待所有正在运行的任务完成。
但是我只需要等待finalI在0到3之间的任务完成即可。任务 3 因为它也有一个结果 != null 和任务 0、1 和 2 来证明任务 3 下面没有结果。

我可以将 executorService.awaitTermination 和 for 循环替换为以下内容吗?

for(int i = 0; i < callableList.size(); ++i) {
    awaitTermination(callableList.get(i));
    if(callableList.get(i).result != null) {
        return callableList.get(i).result;
    }
}

awaitTermination(callableList.get(i)); 是伪代码。我找不到要等到可调用对象执行的函数。

----------- 编辑 2 ---------- --

我自己解决了。

我将 final CountDownLatch countDownLatch = new CountDownLatch(1); 添加到我的类 ResultCallable。
this.result = someFunction(finalI, someObject); 之后我添加了this.countDownLatch.countDown();awaitTermination(callableList.get(i));callableList.get(i).countDownLatch.await(); 取代

如果你有更好的想法,请告诉我。

【问题讨论】:

    标签: java multithreading parallel-processing executorservice


    【解决方案1】:

    为什么不使用invokeAny 而不是invokeAll。当结果不好时抛出异常而不是返回 null 就足够了,并在 ExecutorService 调用时返回 MyObject 而不是 List

    【讨论】:

    • 好主意。我一直认为invokeAny只给出了最快任务的结果,但它给出了最快的SUCCESSFUL任务的结果。所以我可以在可调用内部返回之前进行条件检查,如果条件不满足则抛出异常。
    【解决方案2】:

    解决问题的正确方法是:

    1. 在某个地方创建其他变量,例如 private boolean isFinished,它将在所有线程之间共享,并检查此变量是否会周期性地更改其值

    2. 如果可能的话,你必须让你的 Callable 看起来像这样:

       public class SomeRunnableFunction implements Callable {
      
            private volatile boolean running = true;
      
            public void terminate() {
                running = false;
            }
      
            @Override
            public Object call() throws Exception {
                 while (running) {
                      // your custom logic
                      return new Object();
                 }
                 return null;
            }
      }
      
    3. 所以一个线程完成后将private boolean isFinished更改为true,然后您必须通过调用someRunnableFunction.terminate()取消所有已经启动的任务

    4. 你也可以使用feature.cancel() insted of someRunnableFunction.terminate() 然后你的callable看起来像:

       public class SomeRunnableFunction implements Callable {
      
            @Override
            public Object call() throws Exception {
                 while (true) {
                      // Check regularly if the thread has been
                      // interrupted and if so throws an exception to stop
                      // the task immediately 
                      if (Thread.currentThread().isInterrupted()) {
                           throw new InterruptedException("Thread interrupted");
                      }
                      // your custom logic
                      return new Object();
                 }
                 return null;
            }
      }
      

    【讨论】:

    • 顺便说一下,@Saxon 提到的 invokeAny() 的工作方式类似,您真的不需要编写额外的同步代码。我觉得他的回答是对的
    猜你喜欢
    • 1970-01-01
    • 2018-08-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-05-21
    • 1970-01-01
    • 2020-07-15
    相关资源
    最近更新 更多