【问题标题】:How to use ExecutorService to poll until a result arrives如何使用 ExecutorService 进行轮询,直到结果到达
【发布时间】:2017-03-08 04:26:41
【问题描述】:

我有一个场景,我必须轮询远程服务器检查任务是否已完成。完成后,我会进行不同的调用来检索结果。

我最初认为我应该使用SingleThreadScheduledExecutorscheduleWithFixedDelay 进行轮询:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId);
   }
}

但由于我只能向scheduleWithFixedDelay 提供无法返回任何内容的Runnable,所以我不知道future 何时会完成,如果有的话。打电话给future.get() 是什么意思?我在等什么结果?

当我第一次检测到远程任务已经完成时,我想执行一个不同的远程调用并将其结果设置为future 的值。我想我可以为此使用 CompletableFuture,我会将其转发到我的 poll 方法,该方法又会将其转发到最终完成它的 retrieveTask 方法:

CompletableFuture<Object> result = new CompletableFuture<Object>();
ScheduledFuture future = executor.scheduleWithFixedDelay(() -> poll(jobId, result), 0, 10, TimeUnit.SECONDS);

public void poll(String jobId, CompletableFuture<Object> result) {
   boolean jobDone = remoteServer.isJobDone(jobId);
   if (jobDone) {
       retrieveJobResult(jobId, result);
   }
}

public void retrieveJobResult(String jobId, CompletableFuture<Object> result) {
    Object remoteResult = remoteServer.getJobResult(jobId);
    result.complete(remoteResult);
}

但这有很多问题。一方面,CompletableFuture 似乎并不适合这种用途。相反,我认为我应该做CompletableFuture.supplyAsync(() -&gt; poll(jobId)),但是当我的CompletableFuture 被取消/完成时,我将如何正确关闭executor 并取消它返回的future?感觉轮询应该以完全不同的方式实现。

【问题讨论】:

  • 您也可以提交 Callables(即返回结果):docs.oracle.com/javase/7/docs/api/java/util/concurrent/…
  • @Thilo 仅用于一次性任务,不适用于 scheduleWithFixedDelay 或 scheduleAtFixedRate,因此轮询结束
  • @Thilo 我认为scheduleWithFixedDelay 不会收到Callable
  • Op,我认为你在做正确的事。 CompletableFuture 实际上是异步编程框架中的一个承诺。然而,你应该暴露的是一个无法完成的正常未来。并且您所有后续代码都应该订阅该未来。我看不出有什么问题。什么让你困惑?
  • @HuStmpHrrr 所有示例似乎都在做supplyAsync,而不是显式创建CompletableFuture。但更重要的是,在我的情况下,我需要在未来完成时关闭执行程序。我应该继承CompletableFuture 并覆盖completecompleteExceptionallycancel 来执行此操作吗?我应该担心取消我从执行人那里得到的ScheduledFuture吗?

标签: java concurrency future executorservice completable-future


【解决方案1】:

我认为 CompletableFutures 是一个很好的方法:

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

private void run() {
    final Object jobResult = pollForCompletion("jobId1")
            .thenApply(jobId -> remoteServer.getJobResult(jobId))
            .get();

}

private CompletableFuture<String> pollForCompletion(final String jobId) {
    CompletableFuture<String> completionFuture = new CompletableFuture<>();
    final ScheduledFuture<Void> checkFuture = executor.scheduleAtFixedRate(() -> {
        if (remoteServer.isJobDone(jobId)) {
            completionFuture.complete(jobId);
        }
    }, 0, 10, TimeUnit.SECONDS);
    completionFuture.whenComplete((result, thrown) -> {
        checkFuture.cancel(true);
    });
    return completionFuture;
}

【讨论】:

  • 谢谢!我终于意识到应该如何组合多个步骤。我一直都在想念这个。现在就去试试看。
  • 如果未来被取消,检查whenComplete 内部的正确方法是什么?我只能想出thrown instanceof CancelationException
  • 我不记得了; CancellationException 可能 包含在 ExecutionException 中。 (所以你需要检查thrown.getCause() instanceof CancellationException。)然而,在我的示例代码中,没有什么会导致pollForCompletion CompletableFuture 被取消。你能用你的新代码更新你的问题吗?
  • 我将我的新代码和我的新问题发布为a separate question,因为我想说你已经完全回答了。我接受你的回答,因为你是第一个发帖的。如果您仍然有兴趣帮助我,请检查新问题。
  • whenComplete 的使用似乎不正确,因为可完成的未来可能会抛出异常或超时等。因此handle 似乎是取消调度程序的更合适的方法
【解决方案2】:

在我看来,你比其他任何人都更担心一些风格问题。在java 8中,CompletableFuture有2个作用:一个是传统的future,它为任务执行和状态查询提供了一个异步源;另一个就是我们通常所说的promise。一个promise,如果你还不知道,可以被认为是future 的builder 和它的completion source。所以在这种情况下,直觉上需要一个承诺,这正是您在这里使用的情况。您担心的示例是向您介绍第一种用法的东西,而不是向您介绍的承诺方式。

接受这一点,你应该更容易开始处理你的实际问题。我认为 promise 应该有 2 个角色,一个是通知您的任务完成轮询,另一个是在完成时取消您的计划任务。这里应该是最终的解决方案:

public CompletableFuture<Object> pollTask(int jobId) {
    CompletableFuture<Object> fut = new CompletableFuture<>();
    ScheduledFuture<?> sfuture = executor.scheduleWithFixedDelay(() -> _poll(jobId, fut), 0, 10, TimeUnit.SECONDS);
    fut.thenAccept(ignore -> sfuture.cancel(false));
    return fut;
}

private void _poll(int jobId, CompletableFuture<Object> fut) {
    // whatever polls
    if (isDone) {
        fut.complete(yourResult);
    }
}

【讨论】:

  • 啊,我现在意识到应该如何将步骤与Futures 结合起来。到目前为止,我一直在逃避这件事。谢谢,这很有用。我现在去试试。
  • 这段代码仍然有问题,但它是来自这个代码的a different question。如果您仍然有兴趣帮助我,请查看新问题。再次感谢。
【解决方案3】:

我为此创建了一个通用实用程序,灵感来自this answer,使用Supplier&lt;Optional&lt;T&gt;&gt;,每个投票都可以返回Optional.empty(),直到值准备好。我还实现了timeout,以便在超过最大时间时抛出TimeoutException

用法:

ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Supplier<Optional<String>> supplier = () -> remoteServer.isJobDone(jobId) ? Optional.of(jobId) : Optional.empty();
CompletableFuture<String> future = ScheduledCompletableFuture.builder(String.class)
   .supplier(supplier)
   .executorService(scheduledExecutor)
   .timeUnit(TimeUnit.SECONDS)
   .initialDelay(5)
   .period(5)
   .timeout(60 * 5)
   .build();

ScheduledCompletableFuture.java

public class ScheduledCompletableFuture {
    public static class ScheduledCompletableFutureBuilder<T> {
        private Supplier<Optional<T>> supplier;
        private ScheduledExecutorService executorService;
        private Long initialDelay;
        private Long period;
        private Long timeout;
        private TimeUnit timeUnit;

        public ScheduledCompletableFutureBuilder() {
        }

        public ScheduledCompletableFutureBuilder<T> supplier(Supplier<Optional<T>> supplier) {
            this.supplier = supplier;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> executorService(ScheduledExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> initialDelay(long initialDelay) {
            this.initialDelay = initialDelay;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> period(long period) {
            this.period = period;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeout(long timeout) {
            this.timeout = timeout;
            return this;
        }

        public ScheduledCompletableFutureBuilder<T> timeUnit(TimeUnit timeUnit) {
            this.timeUnit = timeUnit;
            return this;
        }

        public CompletableFuture<T> build() {
            // take a copy of instance variables so that the Builder can be re-used
            Supplier<Optional<T>> supplier = this.supplier;
            ScheduledExecutorService executorService = this.executorService;
            Long initialDelay = this.initialDelay;
            Long period = this.period;
            Long timeout = this.timeout;
            TimeUnit timeUnit = this.timeUnit;

            CompletableFuture<T> completableFuture = new CompletableFuture<>();
            long endMillis = System.currentTimeMillis() + timeUnit.toMillis(timeout);
            Runnable command = () -> {
                Optional<T> optional = supplier.get();
                if (optional.isPresent()) {
                    completableFuture.complete(optional.get());
                } else if (System.currentTimeMillis() > endMillis) {
                    String msg = String.format("Supplier did not return a value within %s %s", timeout, timeUnit);
                    completableFuture.completeExceptionally(new TimeoutException(msg));
                }
            };
            ScheduledFuture<?> scheduledFuture = executorService.scheduleAtFixedRate(command, initialDelay, period, timeUnit);
            return completableFuture.whenComplete((result, exception) -> scheduledFuture.cancel(true));
        }
    }

    public static <T> ScheduledCompletableFutureBuilder<T> builder(Class<T> type) {
        return new ScheduledCompletableFutureBuilder<>();
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-31
    • 2018-02-23
    • 1970-01-01
    • 2020-01-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多