【发布时间】:2020-06-30 07:03:15
【问题描述】:
我在处理 Scala 期货列表中的异常时遇到问题。我在ruuner 方法中调用getQC_report(qcArgsThread,spark) 方法,该方法处理输入文件并保存在Hive 表中。代码如下
import scala.util.{Failure, Success}
import scala.concurrent._
import scala.concurrent.duration._
val spark = SparkSession.builder.master("yarn").enableHiveSupport().getOrCreate()
var argsList: List[Array[String]] = List[Array[String]]()
for(ip_file <- INPUT_FILE.asScala.toList) {
var qcArgs:Array[String] = null
qcArgs = Array("input_file", ip_file,
"hiveDB",hiveDB,
"Outputhive_table",Outputhive_table)
argsList = qcArgs :: argsList
}
var pool = 0
def poolId = {
pool = pool + 1
pool
}
def runner(qcArgsThread: Array[String]) = Future {
sc.setLocalProperty("spark.scheduler.pool", poolId.toString)
getQC_report(qcArgsThread,spark)
}
val futures = argsList map(i => runner(i))
futures foreach(f => Await.ready(f, Duration.Inf))
futures.onComplete {
case Success(x) => {
println(s"\nresult = $x")
}
case Failure(e) => {
System.err.println("Failure happened!")
System.err.println(e.getMessage)
}
}
futures.onComplete 行出现错误。
错误 - 无法解析符号 onComplete。
请帮助我改进代码,因为我是使用 Scala Futures 的新手。谢谢!
【问题讨论】:
-
好吧... 1) 请移除
java.util.concurrent标签。这无关紧要 2) 我强烈建议在学习 Spark 之前花几个月的时间使用 Scala。大多数直接使用 Spark 的人最终都会在这两种代码中编写悲惨且无法使用的代码 3)永远不要在 SPARK 中使用变量,它们会炸毁你的整个程序 4)你的程序不会编译,因为List没有一个名为onComplete的方法,您将需要编写这些期货。一种选择是Future.sequence5) 等待和OnComplete是多余的。选择一个 -
更多建议。 1) Scala 允许您使用
space而不是.进行方法调用,但不要那样做。 2) Spark 是一个带有 Scala API 的平台,如果您的程序中没有使用任何 Spark 功能,那么为什么要使用 Spark? 3) Spark 应用程序应该使用 Spark 指南来实现。除非您对两者都有很好的了解,否则您通常不想将 Scala 期货与 Spark 混合使用。 -
非常感谢您的 cmets,我将根据建议改进代码。我在
getQC_report中使用 Spark 功能。在这种方法中,我正在处理大文件并生成数据分析报告。 -
关于使用 var(可变变量)的更多建议。不仅在 Spark 中,在 Scala 中作为函数式编程范例,您应该尽量避免使用 var 并改用 val(不可变)。这是为了避免“副作用”,这将有助于防止应用程序中出现许多意想不到的问题。对我来说,唯一可以使用 var 的地方是当您仅在某个局部范围内对可变变量进行一些更改时。 (示例中的 var pool 有副作用,因为它在 def poolId 范围之外更改)这样可以保证方法等范围仍然没有任何副作用。
标签: scala apache-spark concurrency