【问题标题】:Error handling in list of scala futures - Apache SparkScala 期货列表中的错误处理 - Apache Spark
【发布时间】:2020-06-30 07:03:15
【问题描述】:

我在处理 Scala 期货列表中的异常时遇到问题。我在ruuner 方法中调用getQC_report(qcArgsThread,spark) 方法,该方法处理输入文件并保存在Hive 表中。代码如下

  import scala.util.{Failure, Success}
  import scala.concurrent._
  import scala.concurrent.duration._
  val spark = SparkSession.builder.master("yarn").enableHiveSupport().getOrCreate()
  
  var argsList: List[Array[String]] = List[Array[String]]()
  for(ip_file <- INPUT_FILE.asScala.toList) {
    var qcArgs:Array[String] = null
       qcArgs = Array("input_file", ip_file,
        "hiveDB",hiveDB,
        "Outputhive_table",Outputhive_table)
    argsList = qcArgs :: argsList
  }
  var pool = 0

  def poolId = {
    pool = pool + 1
    pool
  }

  def runner(qcArgsThread: Array[String]) = Future {
    sc.setLocalProperty("spark.scheduler.pool", poolId.toString)
    getQC_report(qcArgsThread,spark)
    }
  val futures = argsList map(i => runner(i))

  futures foreach(f => Await.ready(f, Duration.Inf))

  futures.onComplete {
    case Success(x) => {
      println(s"\nresult = $x")
    }
    case Failure(e) => {
      System.err.println("Failure happened!")
      System.err.println(e.getMessage)
    }
  }

futures.onComplete 行出现错误。

错误 - 无法解析符号 onComplete。

请帮助我改进代码,因为我是使用 Scala Futures 的新手。谢谢!

【问题讨论】:

  • 好吧... 1) 请移除java.util.concurrent 标签。这无关紧要 2) 我强烈建议在学习 Spark 之前花几个月的时间使用 Scala。大多数直接使用 Spark 的人最终都会在这两种代码中编写悲惨且无法使用的代码 3)永远不要在 SPARK 中使用变量,它们会炸毁你的整个程序 4)你的程序不会编译,因为 List 没有一个名为onComplete 的方法,您将需要编写这些期货。一种选择是Future.sequence 5) 等待和OnComplete 是多余的。选择一个
  • 更多建议。 1) Scala 允许您使用space 而不是. 进行方法调用,但不要那样做。 2) Spark 是一个带有 Scala API 的平台,如果您的程序中没有使用任何 Spark 功能,那么为什么要使用 Spark? 3) Spark 应用程序应该使用 Spark 指南来实现。除非您对两者都有很好的了解,否则您通常不想将 Scala 期货与 Spark 混合使用。
  • 非常感谢您的 cmets,我将根据建议改进代码。我在 getQC_report 中使用 Spark 功能。在这种方法中,我正在处理大文件并生成数据分析报告。
  • 关于使用 var(可变变量)的更多建议。不仅在 Spark 中,在 Scala 中作为函数式编程范例,您应该尽量避免使用 var 并改用 val(不可变)。这是为了避免“副作用”,这将有助于防止应用程序中出现许多意想不到的问题。对我来说,唯一可以使用 var 的地方是当您仅在某个局部范围内对可变变量进行一些更改时。 (示例中的 var pool 有副作用,因为它在 def poolId 范围之外更改)这样可以保证方法等范围仍然没有任何副作用。

标签: scala apache-spark concurrency


【解决方案1】:

简短的回答是因为argsListList[Array[String]]

val futures = argsList map(i => runner(i))

类型为List[Future[WhateverGetQC_ReportReturns]]。它具体不是Future,所以没有onComplete 方法。

如果你想要一个Future 在所有期货都完成后完成,Future.sequence 会将List[Future[T]] 转换为Future[List[T]]

// replaces all code after val futures = argsList map ...
val allFutures = Future.sequence(futures)

val result: List[WhateverGetQC_ReportReturns] =
  try {
    Await.result(allFutures, Duration.Inf)
  } catch {
    case NonFatal(e) =>
      System.err.println("Failure happened!")
      System.err.println(e.getMessage)
  }

【讨论】:

    猜你喜欢
    • 2014-01-10
    • 1970-01-01
    • 2017-12-20
    • 2017-09-30
    • 1970-01-01
    • 1970-01-01
    • 2017-12-17
    • 2021-03-02
    • 2016-01-24
    相关资源
    最近更新 更多