【问题标题】:Spark Scala: difference in App execution vs line by line in REPLSpark Scala:应用程序执行与 REPL 中逐行执行的差异
【发布时间】:2018-03-02 17:18:52
【问题描述】:

我有一个简单的字数统计程序,打包为object

object MyApp {

  val path = "file:///home/sergey/spark/spark-2.2.0/README.md"

  val readMe = sc.textFile(path)

  val stop = List("to","the","a")

  val res = (readMe
    .flatMap(_.split("\\W+"))
    .filter(_.length > 0)
    .map(_.toLowerCase)
    .filter(!stop.contains(_))
    .map((_, 1))
    .reduceByKey(_ + _)
    .sortBy(-_._2)
    )

  println(res.take(3).mkString)
}

当我尝试执行它时,我得到:

scala> MyApp
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
  at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
  at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
  ... 51 elided
Caused by: java.io.NotSerializableException: MyApp$
Serialization stack:
    - object not serializable (class: MyApp$, value: MyApp$@7bd44868)
    - field (class: MyApp$$anonfun$5, name: $outer, type: class MyApp$)
    - object (class MyApp$$anonfun$5, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

(罪魁祸首是.filter(!stop.contains(_))线。

但是,当我逐行执行相同的代码时,它运行良好并产生了预期的结果。

非常感谢您回答 2 个问题:

  1. 逐行执行和单例执行之间有什么不同,以至于一个运行而另一个失败?

  2. 除了将 !stop.contains(_) 闭包和 stop 列表打包到另一个对象中之外,还有什么其他解决方案?

【问题讨论】:

  • 您是否尝试将所有代码都放在一个主目录中?或者也许将extends App 添加到您的对象定义中?
  • @philantrovert 我正在执行此代码as-isin spark-shell

标签: scala apache-spark serialization closures


【解决方案1】:

一般来说。你的程序有点奇怪

让我帮忙说明一下细节。希望对您有所帮助!

您说如果程序逐行执行,您会得到正确答案。从您的描述上下文来看,我猜这发生在 spark-shell 中,对吧?

需要注意的一点是,如果您在 spark 包中打开 spark-shell,则您处于 REPL 环境中,将会有 为您预先构建好的 spark 上下文对象,制作自己的 SparkContext 将不起作用

例如,

$ ./bin/spark-shell --master local[4]

现在您想要获得由程序文本表示的spark 应用程序。比如你的文件MyApp.scala,然后在RDD上进行并行操作,这里对于readMe,一个RDD[String],spark会中断action触发的job take到许多任务并将那里的任务转发给工人执行,

但是,现在请注意你的代码!,为了让你的操作来构造闭包(那些变量和方法必须对执行程序可见以执行其计算在 RDD 上),但从您的代码来看,您所有的闭包计算都在您的整个对象中。

但是,在单例对象MyApp中,SparkContext类型,例如sc,不能也不应该是可序列化的,因为它只会站在驱动节点上告诉spark如何访问集群,因此您的提交将失败。

我帮助修改了这段代码,你可以在你的机器上运行它。但是为了您的目的,您修改后的代码应该提交到 spark-submit 脚本。

import org.apache.spark.{SparkConf, SparkContext}

object MyApp {

  private var sc: SparkContext = _

  def init(): Unit = {
    val sparkConf = new SparkConf().setAppName(this.getClass.getName)
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sc = new SparkContext(sparkConf)
  }

  def mission(): Unit = {
    val path = "file:///home/sergey/spark/spark-2.2.0/README.md"

    val readMe = sc.textFile(path)

    val stop = List("to", "the", "a")

    val res = readMe
      .flatMap(_.split("\\W+"))
      .filter(_.length > 0)
      .map(_.toLowerCase)
      .filter(!stop.contains(_))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(-_._2)


    println(res.take(3).mkString)
  }

  def main(args: Array[String]): Unit = {
    init()
    mission()
  }
}

请参考page上的提交使用,迟早要使用。

【讨论】:

  • 感谢您关注我的问题! (1) 没错,代码在spark-shell 中执行 (2) 我没有构造任何SparkContext,我使用的是spark shell 提供的,所以没有冲突。 (3)我仍然认为问题不是因为SparkContext 不可序列化(我不明白你的答案中的这部分),而是因为闭包没有正确序列化。证明:如果我将闭包与stop 列表放在一起,或者完全删除它,代码将运行。无论如何,感谢您的宝贵时间!
  • 还有一条评论。问题不在于如何通过spark-submit 提交scala 应用程序,或者如何避免连续初始化多个SparkContexts。这是关于如何避免从错误日志中看到的序列化错误。
猜你喜欢
  • 1970-01-01
  • 2020-04-02
  • 2021-07-04
  • 2012-04-10
  • 1970-01-01
  • 1970-01-01
  • 2019-10-10
  • 1970-01-01
  • 2015-12-03
相关资源
最近更新 更多