【问题标题】:How to retrieve value from the output of a scala Future?如何从 scala Future 的输出中检索值?
【发布时间】:2019-04-08 19:31:20
【问题描述】:

我正在尝试查询表,将查询的值存储在 Scala 映射中并返回相同的映射。 为此,我想出了以下代码:

def getBounds(incLogIdMap:scala.collection.mutable.Map[String, String]): Future[scala.collection.mutable.Map[String, String]] = Future {
  var boundsMap = scala.collection.mutable.Map[String, String]()
  incLogIdMap.keys.foreach(table => if(!incLogIdMap(table).contains("INVALID")) {
    val minMax    = s"select max(cast(to_char(update_tms,'yyyyddmmhhmmss') as bigint)) maxTms, min(cast(to_char(update_tms,'yyyyddmmhhmmss') as bigint)) minTms from queue.${table} where key_ids in (${incLogIdMap(table)})"
    val boundsDF  = spark.read.format("jdbc").option("url", commonParams.getGpConUrl()).option("dbtable", s"(${minMax}) as ctids")
      .option("user", commonParams.getGpUserName()).option("password", commonParams.getGpPwd()).load()
    val maxTms = boundsDF.select("minTms").head.getLong(0).toString + "," + boundsDF.select("maxTms").head.getLong(0).toString
    boundsMap += (table -> maxTms)
  }
  )
  boundsMap
}

为了从方法getBounds接收值,我使用了方法onCompletion如下:

val tmsobj    = new MinMaxVals(spark, commonParams)
val boundsMap = tmsobj.getBounds(incLogIds)
boundsMap.onComplete({
  case Success(value) =>
  case Failure(value) =>
})

我以前在 Scala 中编写过代码,但我对 Scala 中的Futures 不熟悉。谁能告诉我如何将getBounds返回的值检索到val boundsMap

【问题讨论】:

  • 我看到您的回答,您提到“不建议从 Future 访问值,因为它违背了异步计算的目的”。在这种情况下,我如何将 Future 应用于方法并使用它返回的值?由于我对 Futures 缺乏了解,感到有些困惑。
  • 我已经编辑了我的答案。如果这能回答您的问题,请告诉我
  • @Chaitanya,阅读您的答案,但我对其他燃烧的疑问感到困惑 -> 如果不建议检索 Future 的值,我如何/何时访问 Future 的结果?如果 Future 的目的是运行独立于主线程的线程/进程,为什么不建议访问它? Future 会自动将其输出分配给它的调用者吗?

标签: scala apache-spark


【解决方案1】:

您可以使用等待(不是最好的方法)

val boundsMap = Await.result(tmsobj.getBounds(incLogIds),Duration.Inf)

或仅在需要时使用该值

val boundsMap = tmsobj.getBounds(incLogIds)
booundsMap.map(value => Smth_To_Do(value))

【讨论】:

    【解决方案2】:

    不建议从 Future 访问值,因为它违背了异步计算的目的。但是,在某些情况下,您可能正在处理遗留代码,或者在某些情况下从未来获取值是前进的方向。处理这种情况,有两种方法

    1. 使用 await 会阻塞线程
    Await.result(getBounds, 10 seconds)
    

    所以,这里 await 所做的是,它将等待 10 秒以等待 getBounds 未来完成。如果它在这段时间内完成,那么你就有了价值,否则你会在这里得到一个异常。这种方法最大的缺点就是阻塞了当前的执行线程。

    1. 使用回调方法onComplete,就像你使用的那样
    getBounds onComplete {
       case Success(someOption) => myMethod(someOption)
           case Failure(t) => println("Error)
         }
    

    所以 onComplete 所做的是注册一个回调函数,该函数将在未来完成时执行。这比等待更安全。 详情可参考Accessing value returned by scala futures

    我希望这能回答您的问题。

    【讨论】:

    • 这是调用函数的正确方法吗:getBounds -> val tmsobj = new MinMaxVals(spark, commonParams) tmsobj.getBounds(incLogIds) onComplete ({ case Success(Map) => val boundsMap = tmsobj.getBounds(incLogIds) case Failure(value) => println("Future failed..") })
    猜你喜欢
    • 2019-08-31
    • 2014-05-29
    • 1970-01-01
    • 2017-08-02
    • 1970-01-01
    • 1970-01-01
    • 2021-01-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多