【问题标题】:Wait until all Future.onComplete callbacks are executed等到所有 Future.onComplete 回调被执行
【发布时间】:2014-02-06 21:47:49
【问题描述】:

我正在使用来自 Scala 2.10.X 的 Future API。

这是我的用例:

object Class1 {

  def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

    val start = DateTime.now

    val result = f(i)

    result.onComplete{
      case _ => println("Started at " + start + ", ended at " + DateTime.now)
    }

    result
  }
}

我认为非常简单:我正在为我的未来添加一个 onComplete 回调。现在,我想知道是否有一种方法可以在 onComplete 完成执行时添加回调 - 在此示例中,知道日志记录何时完成。

假设我的result 实例注册了3 个onComplete,我能知道它们何时都已被执行吗?我不认为这是可能的,但谁知道:)

也许另一种方法是调用map 而不是onComplete 来返回Future 的新实例:

def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

  val start = DateTime.now

  f(i) map {
    case r => 
      println("Started at " + start + ", ended at " + DateTime.now)
      r
  }
}

但我不确定它是否会保持相同的行为。

编辑:澄清一下 - Future 只有 一个 实例,我在 相同 实例上调用 onComplete 3 次(嗯,在我的示例仅一次,但假设我调用它 N 次)并且我想知道由于相同的 Future 实例的完成,3 个回调何时执行完毕。

【问题讨论】:

  • 等一下,我可能回答错了。你是说你有 1 个 Future 并且你已经完成了 3 次 onComplete 还是 3 个 Future 每个都有一个 onComplete?因为如果是第二个,我的回答就是垃圾。
  • @wheaties 查看我的编辑:是的,只有一个 Future,我们在同一个实例上调用 onComplete N 次。

标签: scala future


【解决方案1】:

如果您不想使用其他方法(例如 CountDownLatch),那么您想使用andThen 来了解您的操作何时完成(成功与否,以及 Future 是否成功)。

scala> val f = Future(3)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@4b49ca35

scala> val g = f andThen { case Success(i) => println(i) } andThen { case _ => println("All done") }
3
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1939e13
All done

如果未来失败,则映射的函数不会被调用:

scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@7001619b

scala> val g = f andThen { case t => println(s"stage 1 $t") } andThen { case _ => println("All done") }
stage 1 Failure(java.util.concurrent.ExecutionException: Boxed Error)
All done
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@24e1e7e8

scala> val g = f map { case i => println(i) } andThen { case _ => println("All done") }
All done
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5d0f75d6

scala> val g = f map { case i => println(i) } map { case _ => println("All done") }
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5aabe81f

scala> g.value
res1: Option[scala.util.Try[Unit]] = Some(Failure(java.util.concurrent.ExecutionException: Boxed Error))

同样,在链式处理程序中爆炸不会破坏后续操作:

scala> val g = f andThen { case t => null.hashCode } andThen { case _ => Thread.sleep(1000L); println("All done") }
java.lang.NullPointerException
    at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
    at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:431)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:430)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@3fb7bec8

scala> All done


scala> g.value
res1: Option[scala.util.Try[Int]] = Some(Success(3))

对于需要等待的不幸情况:

scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@859a977

scala> import java.util.concurrent.{ CountDownLatch => CDL }
import java.util.concurrent.{CountDownLatch=>CDL}

scala> val latch = new CDL(3)
latch: java.util.concurrent.CountDownLatch = java.util.concurrent.CountDownLatch@11683e9f[Count = 3]

scala> f onComplete { _ => println(1); latch.countDown() }
1

scala> f onComplete { _ => println(2); latch.countDown() }
2

scala> f onComplete { _ => println(3); latch.countDown() }
3

scala> f onComplete { _ => latch.await(); println("All done") }
All done

【讨论】:

  • 好电话!我完全忘记了 map 如果 Future 或任何回调在执行期间发生失败,将被忽略。
【解决方案2】:

有 1 个 Future 和 3 个 onComplete

我认为您将不得不采取将您的函数组合成单个 onComplete 调用的路线,否则您必须完全按照您所说的去做,使用 map

 val fut1 = myFut map func1 // yes, a Future[Unit]
 val fut2 = myFut map func2
 val fut3 = myFut map func3

按照下一部分了解它们何时完成。

有 3 种不同的期货

很可能知道所有三个Futures 何时完成。事实上,在 Scala 中 Future 编写!

 def threeFutures(one: Future[Int], two: Future[Int], three: Future[Int]) {
   val fourth = for {
     _ <- one
     _ <- two
     _ <- three
   } yield 0

   fourth onComplete {
     case _ => println("all done")
   }
 }

现在这是什么意思?这意味着fourthFuture,它不关心三个参数的输入,但是当它们都完成时,它会自行完成。这是预先包装好的,专为您准备。

(旁注:在示例中,我还假设您在范围内拥有所有隐式。)

【讨论】:

  • 感谢您的回答。我认为即使只有一个onComplete,问题仍然存在,我仍然无法找到一种方法来知道这个注册的回调何时执行完毕。我认为你是对的,map 会起作用,因为那时,当我的Future 完成时,我知道map 中的代码已经运行完毕。
  • @vptheron 由于我给出的原因,这并不完全正确。您不会看到失败的期货何时完成。我认为最好的解决方案是另一个同步原语,就像一个锁存器,所以当最后一个完成者完成时,你会开始你接下来想做的任何事情,而不是阻止它。
  • @som-snytt:这个解决方案工作得很好:pastebin
  • @senia 不会打印 1,2,3,对吧?所以这些不会像 onComplete 那样作为副作用回调运行。我认为这是用例,知道它们何时完成(即在运行之后)。
猜你喜欢
  • 1970-01-01
  • 2018-09-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-02-25
  • 2019-05-22
  • 1970-01-01
相关资源
最近更新 更多