【问题标题】:Scala Futures with multiple dependencies具有多个依赖项的 Scala 期货
【发布时间】:2018-06-25 19:42:50
【问题描述】:

我必须异步计算一组相互之间可以有多个依赖关系的特性(无循环)。例如

 class FeatureEncoderMock(val n:String, val deps: List[String] = List.empty) {
      def compute = {
          println(s"starting computation feature $n")
          Thread.sleep(r.nextInt(2500))
          println(s"end computation feature $n")
      }
  }

  val registry = Map(
        "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
        "factLogA" -> new FeatureEncoderMock("factLogA"),
        "factLogB" -> new FeatureEncoderMock("factLogB"),
        "feat1" -> new FeatureEncoderMock("feat1", List("factLogA", "factLogB")),
        "feat2" -> new FeatureEncoderMock("feat2", List("factLogA")),
        "feat3" -> new FeatureEncoderMock("feat3", List("feat1")),
        "feat4" -> new FeatureEncoderMock("feat4", List("feat3", "factLogB"))
  )

我想要实现的是在 feat4 上调用一个函数,该函数将触发所有相关特征的计算并处理它们之间的依赖关系。我试过这个

def run(): Unit = {
val requested = "feat4"

val allFeatures = getChainOfDependencies(requested)

val promises = allFeatures.zip(Seq.fill(allFeatures.size)(Promise[Unit])).toMap

def computeWithDependencies(f: String) = Future {
  println(s"computing $f")
  val encoder = registry(f)

  if(encoder.deps.isEmpty) {
    promises(f).success(registry(f).compute)
  }
  else {
    val depTasks = promises.filterKeys(encoder.deps.contains)

    val depTasksFuture = Future.sequence(depTasks.map(_._2.future))

    depTasksFuture.onSuccess({
      case _ =>
        println(s"all deps for $f has been computed")
        promises(f).success(registry(f).compute)
        println(s"done for $f")
    })
  }
 }

computeWithDependencies(requested)
}

但我不明白为什么执行顺序不符合预期。我不确定在承诺中满足未来的正确方法是什么。我很确定这段代码在那部分是错误的。

【问题讨论】:

    标签: scala asynchronous concurrency promise


    【解决方案1】:

    我认为你对承诺想得太多了; Future 组合可能就是您所需要的。像这样的:

    import scala.collection.mutable
    
    def computeWithDependencies(s: String, cache: mutable.Map[String, Future[Unit]] = mutable.Map.empty)
                               (implicit ec: ExecutionContext): Future[Unit] = {
      cache.get(s) match {
        case Some(f) => f
        case None => {
          val encoder = registry(s)
          val depsFutures = encoder.deps.map(d => computeWithDependencies(d, cache))
          val result = Future.sequence(depsFutures).flatMap(_ => Future { encoder.compute })
          cache += s -> result
          result
        }
      }
    }
    

    flatMap 的调用确保所有依赖期货在“当前”未来执行之前完成,即使结果(List[Unit])被忽略。如果依赖关系图中有一个“菱形”,则使用缓存的业务只是为了防止重新计算,但如果它没有或者如果您可以重新计算,则可以省略。无论如何,运行时:

    val futureResult = computeWithDependencies("feat4")
    Await.result(futureResult, 30 seconds)
    

    我看到了这个输出:

    starting computation feature factLogB
    starting computation feature factLogA
    end computation feature factLogB
    end computation feature factLogA
    starting computation feature feat1
    end computation feature feat1
    starting computation feature feat3
    end computation feature feat3
    starting computation feature feat4
    end computation feature feat4
    

    这对我来说似乎是正确的。

    【讨论】:

    • 这太完美了!非常感谢。是的,我的主要问题是每次我定义一个 Future 时,它​​都会开始计算它,所以我缺少的是组合。谢谢!
    • 还有一件事:如果我想同时运行多个特性的computeWithDependencies,你会如何修改代码?
    猜你喜欢
    • 2016-03-31
    • 2015-08-06
    • 1970-01-01
    • 2012-02-08
    • 1970-01-01
    • 1970-01-01
    • 2019-10-17
    • 1970-01-01
    • 2016-01-19
    相关资源
    最近更新 更多