【问题标题】:Spark scala task not serializable for closureSpark scala任务不可序列化以关闭
【发布时间】:2016-09-14 02:24:47
【问题描述】:

我有一个 RDD 行,我想根据闭包进行过滤。最终,我想将闭包作为参数传递给正在执行过滤器的方法,但我已经对其进行了简化,并且可以通过类似这样的简单方法重现错误。

def fn(l: Long): Boolean = true
rdd.filter{ row => fn(row.getAs[Long]("field")) }

我尝试将 fn 放入一个 case 对象,一个扩展可序列化特征的对象,在方法调用过滤器的内部和外部定义 fn。我试图弄清楚我需要做什么而不会出现这些错误。我知道已经有很多关于堆栈溢出的问题,我一直在寻找合适的答案,但我找不到。

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.RDD.filter(RDD.scala:340)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
$line131.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
$line131.$read$$iwC$$iwC$$iwC.<init>(<console>:54)
$line131.$read$$iwC$$iwC.<init>(<console>:56)
$line131.$read$$iwC.<init>(<console>:58)
$line131.$read.<init>(<console>:60)
$line131.$read$.<init>(<console>:64)
$line131.$read$.<clinit>(<console>)
$line131.$eval$.<init>(<console>:7)
$line131.$eval$.<clinit>(<console>)
$line131.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:601)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:351)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:722)

更新:

一个更完整的例子。我正在使用 Toree 运行 Jupyter,并从我的单元格中的 jar 文件中执行代码。以下是我尝试过的三件事,但都失败了

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

class NotWorking1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) {
  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](longField)) }
}

object NotWorking1 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    def myFn(l: Long): Boolean = true
    new NotWorking1(sc, sqlContext, myFn)
  }
}

class NotWorking2(sc: SparkContext, sqlContext: SQLContext) {
  def myFn(l: Long): Boolean = true

  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
    rdd.filter{ row => myFn(row.getAs[Long](longField)) }
  }
}

object NotWorking2 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    new NotWorking2(sc, sqlContext)
  }
}

class NotWorking3(sc: SparkContext, sqlContext: SQLContext) {
  def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
    def myFn(l: Long): Boolean = true
    rdd.filter{ row => myFn(row.getAs[Long](longField)) }
  }
}

object NotWorking3 {
  def apply(sc: SparkContext, sqlContext: SQLContext) = {
    new NotWorking3(sc, sqlContext)
  }
}

从 Jupyter 单元中,我导入适当的类并运行

val nw1 = NotWorking1(sc, sqlContext)
val nw2 = NotWorking2(sc, sqlContext)
val nw3 = NotWorking3(sc, sqlContext)
nw1.myFilterer(rdd, "field")
nw2.myFilterer(rdd, "field")
nw3.myFilterer(rdd, "field")

这三个都失败了。 NotWorking3 尤其令人惊讶。我可以做任何事情来隔离函数而不是尝试序列化整个对象(我相信这无论如何都会给我带来麻烦,因为我保留了对 spark 和 sql 上下文的引用)

【问题讨论】:

  • 你能展示你的完整代码吗?
  • 我在一个 jupyter 笔记本上运行这个,上面的两行在一个单元格中被打破了,我唯一没有定义的是 rdd,它是一个包含字段 "的 RDD[Row]字段”。
  • 我不太了解 Jupyter,但我的猜测是 fn 被包裹在另一个不可序列化的父对象中(通过 Jupyter)
  • 这是否意味着您认为这应该有效?我也在 Jupyter 之外尝试过,将 fn 的定义保留在方法中(而不是作为参数),我得到了同样的错误,即使我在 REPL 中运行它
  • Spark 很好地指出了哪个变量是不可序列化的。您可以发布完整的异常消息吗?

标签: scala serialization apache-spark rdd


【解决方案1】:

根据我的经验,如果您希望它们可序列化,最简单的方法是只使用函数而不是方法。换句话说,如果您希望将您的代码片段传送给执行器,请使用 val 定义它们,而不是 def。

在您的示例中,在 NotWorking3 类中,将 myFn 更改如下,它将起作用:

val myFn = (l: Long) => true

更新

对于 NotWorking1 和 2,除了使用 val 代替 def,您还需要扩展 Serializable trait 并使用 @SerialVersionUID 注释。这是您的示例的工作版本(此处和此处略有更改):

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}

@SerialVersionUID(100L)
class Working1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) extends Serializable{
  def myFilterer(rdd:RDD[Row]): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](0)) }
}

@SerialVersionUID(101L)
class Working2 (sc: SparkContext, sqlContext: SQLContext) extends Serializable{
  val myFn = (l: Long) => true

  def myFilterer(rdd:RDD[Row]): RDD[Row] = {
    rdd.filter{ row => myFn(row.getAs[Long](0)) }
  }
}

class Working3 (sc: SparkContext, sqlContext: SQLContext) {
  def myFilterer(rdd:RDD[Row]): RDD[Row] = {
    val myFn = (l: Long) => true
    rdd.filter{ row => myFn(row.getAs[Long](0)) }
  }
}

val myFnGlobal = (l: Long) => true
val r1 = sc.parallelize(List(1L,2L,3L,4L,5L,6L,7L)).map(x => Row(x))

val w1 = new Working1(sc, sqlContext, myFnGlobal)
val w2 = new Working2(sc, sqlContext)
val w3 = new Working3(sc, sqlContext)
w1.myFilterer(r1).collect
w2.myFilterer(r1).collect
w3.myFilterer(r1).collect

【讨论】:

  • 这适用于 NotWorking3,所以我现在可以使用它作为替代品,但它仍然会在 NotWorking1 和 2 中中断
  • 玛丽安娜-好的。我没有在你的例子上试过这个,但是对于 NotWorking1 和 2,除了使用 val 而不是 def,你可能还需要扩展 Serializable 特征并使用 @SerialVersionUID 注释。见safaribooksonline.com/library/view/scala-cookbook/9781449340292/…
【解决方案2】:

@JustinPihony 的答案是正确的:Jupyter 将动态创建一个类,其中包含您在其会话中键入的代码,然后代表您将其提交给 spark。您创建的 fn 需要包含该封闭类。

您可能需要jar 将您的自定义逻辑添加到用户定义的 jar 文件中,并将其包含在 jupyter 类路径中。添加到类路径的过程将取决于您使用的 jupyter 内核。

【讨论】:

    猜你喜欢
    • 2015-12-16
    • 1970-01-01
    • 2017-09-21
    • 2021-08-12
    • 2020-02-04
    • 2023-04-02
    • 2015-09-21
    • 1970-01-01
    • 2018-04-06
    相关资源
    最近更新 更多