【问题标题】:Why is it not recommended to retrieve value from Scala's Future?为什么不建议从 Scala 的 Future 中检索值?
【发布时间】:2019-08-31 12:42:19
【问题描述】:

我最近开始研究 Scala,并发现了它的一个名为 Future 的特性。我发布了一个问题以寻求有关我的代码的帮助以及一些帮助。

在那次谈话中,有人告诉我不建议从 Future 中检索值。

我知道执行时它是一个并行过程,但如果不建议检索 Future 的值,我如何/何时访问它的结果?如果 Future 的目的是运行独立于主线程的线程/进程,为什么不建议访问它? Future 会自动将其输出分配给它的调用者吗?如果是这样,我们怎么知道何时访问它?

我编写了以下代码以返回带有 Map[String, String] 的 Future。

def getBounds(incLogIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
  var boundsMap = scala.collection.mutable.Map[String, String]()
  incLogIdMap.keys.foreach(table => if(!incLogIdMap(table).contains("INVALID")) {
    val minMax    = s"select max(cast(to_char(update_tms,'yyyyddmmhhmmss') as bigint)) maxTms, min(cast(to_char(update_tms,'yyyyddmmhhmmss') as bigint)) minTms from queue.${table} where key_ids in (${incLogIdMap(table)})"
    val boundsDF  = spark.read.format("jdbc").option("url", commonParams.getGpConUrl()).option("dbtable", s"(${minMax}) as ctids")
      .option("user", commonParams.getGpUserName()).option("password", commonParams.getGpPwd()).load()
    val maxTms = boundsDF.select("minTms").head.getLong(0).toString + "," + boundsDF.select("maxTms").head.getLong(0).toString
    boundsMap += (table -> maxTms)
  }
  )
  boundsMap
}

如果我必须使用 getBounds 方法返回的值,我可以通过以下方式访问它吗?

val tmsobj = new MinMaxVals(spark, commonParams)
  tmsobj.getBounds(incLogIds) onComplete ({ 
  case Success(Map) => val boundsMap = tmsobj.getBounds(incLogIds) 
  case Failure(value) => println("Future failed..") 
})

有人能解开我的疑惑吗?

【问题讨论】:

  • boundsMap 既是var 又是可变集合?这是不必要的多余。
  • @jwh,那么我该如何构建“成功”的案例。因为它说“Pattern type is in compatible with the expected type, found: mutable.map.type, required mutable.Map{String,String]”
  • 你错过了我评论的重点。请参阅我发布的答案。

标签: scala future


【解决方案1】:

正如其他人所指出的,等待Future 中检索一个值首先破坏了启动 Future 的全部意义。

但是onComplete() 不会导致您的其余代码等待,它只是附加额外的指令,作为Future 线程的一部分执行,而您的其余代码继续运行。

那么,您提出的访问getBounds() 结果的代码有什么问题?让我们来看看吧。

tmsobj.getBounds(incLogIds) onComplete { //launch Future, when it completes ...
  case Success(m) => //if Success then store the result Map in local variable "m"
    val boundsMap = tmsobj.getBounds(incLogIds) //launch a new and different Future
    //boundsMap is a local variable, it disappears after this code block

  case Failure(value) => //if Failure then store error in local variable "value"
    println("Future failed..") //send some info to STDOUT 
}//end of code block

您会注意到我将Success(Map) 更改为Success(m),因为Map 是一种类型(它是一个伴随对象)并且不能用于匹配您的Future 的结果。

总结:onComplete() 不会导致您的代码在 Future 上等待,这很好,但它有些限制,因为它返回 Unit,即它没有可以与之通信的返回值Future 的结果。

【讨论】:

    【解决方案2】:

    TLDR; Futures 不是用来管理共享状态的,但它们很适合编写异步代码。可以使用mapflatMap等多种操作组合Futures

    Future 表示的计算将使用给定的ExecutionContext(通常是隐式给出)执行,这通常在线程池上,因此您可以假设Future 计算发生在平行线。由于这种并发性,通常不建议更改从 Future 的主体内部共享的状态,例如:

     var i: Int = 0
     val f: Future[Unit] = Future { 
         // Some computation
         i = 42
     }
    

    因为您会冒着在另一个线程(可能是“主”线程)中访问/修改i 的风险。在这种并发访问的情况下,Futures 可能不是正确的并发模型,您可以想象使用监视器或消息传递来代替。

    另一种诱人但不鼓励的可能性是阻塞主线程直到结果可用:

    val f: Future[Init] = Future { 42 }
    val i: Int = Await.result(f)
    

    这样做不好的原因是您将完全阻塞主线程,从而首先消除并发执行的好处。如果你这样做太多,你也可能因为大量线程被阻塞和占用资源而陷入困境。

    那么,您如何知道何时访问结果?你没有,这实际上是你应该尽可能多地尝试编写Futures 的原因,并且只在你的应用程序的最边缘订阅他们的onComplete 方法。对于您的大多数方法来说,典型的做法是获取并返回Futures,并且只在非常特定的地方订阅它们。

    【讨论】:

      【解决方案3】:

      不建议等待使用Await.result 来等待Future,因为这会阻塞当前线程的执行,直到将来某个未知点,可能是永远。

      通过将处理函数传递给map 上的map 之类的调用,处理Future 的值 是完全可以的。这将在未来完成时调用您的函数。 map的结果是另一个Future,可以依次使用maponComplete或其他方法处理。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-08-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-06-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多