【发布时间】:2016-09-14 02:24:47
【问题描述】:
我有一个 RDD 行,我想根据闭包进行过滤。最终,我想将闭包作为参数传递给正在执行过滤器的方法,但我已经对其进行了简化,并且可以通过类似这样的简单方法重现错误。
def fn(l: Long): Boolean = true
rdd.filter{ row => fn(row.getAs[Long]("field")) }
我尝试将 fn 放入一个 case 对象,一个扩展可序列化特征的对象,在方法调用过滤器的内部和外部定义 fn。我试图弄清楚我需要做什么而不会出现这些错误。我知道已经有很多关于堆栈溢出的问题,我一直在寻找合适的答案,但我找不到。
Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
org.apache.spark.SparkContext.clean(SparkContext.scala:2058)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
org.apache.spark.rdd.RDD.filter(RDD.scala:340)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
$line131.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
$line131.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
$line131.$read$$iwC$$iwC$$iwC.<init>(<console>:54)
$line131.$read$$iwC$$iwC.<init>(<console>:56)
$line131.$read$$iwC.<init>(<console>:58)
$line131.$read.<init>(<console>:60)
$line131.$read$.<init>(<console>:64)
$line131.$read$.<clinit>(<console>)
$line131.$eval$.<init>(<console>:7)
$line131.$eval$.<clinit>(<console>)
$line131.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:601)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:351)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:350)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:722)
更新:
一个更完整的例子。我正在使用 Toree 运行 Jupyter,并从我的单元格中的 jar 文件中执行代码。以下是我尝试过的三件事,但都失败了
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
class NotWorking1(sc: SparkContext, sqlContext: SQLContext, fn: Long=>Boolean) {
def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = rdd.filter{ row => fn(row.getAs[Long](longField)) }
}
object NotWorking1 {
def apply(sc: SparkContext, sqlContext: SQLContext) = {
def myFn(l: Long): Boolean = true
new NotWorking1(sc, sqlContext, myFn)
}
}
class NotWorking2(sc: SparkContext, sqlContext: SQLContext) {
def myFn(l: Long): Boolean = true
def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
rdd.filter{ row => myFn(row.getAs[Long](longField)) }
}
}
object NotWorking2 {
def apply(sc: SparkContext, sqlContext: SQLContext) = {
new NotWorking2(sc, sqlContext)
}
}
class NotWorking3(sc: SparkContext, sqlContext: SQLContext) {
def myFilterer(rdd:RDD[Row], longField: String): RDD[Row] = {
def myFn(l: Long): Boolean = true
rdd.filter{ row => myFn(row.getAs[Long](longField)) }
}
}
object NotWorking3 {
def apply(sc: SparkContext, sqlContext: SQLContext) = {
new NotWorking3(sc, sqlContext)
}
}
从 Jupyter 单元中,我导入适当的类并运行
val nw1 = NotWorking1(sc, sqlContext)
val nw2 = NotWorking2(sc, sqlContext)
val nw3 = NotWorking3(sc, sqlContext)
nw1.myFilterer(rdd, "field")
nw2.myFilterer(rdd, "field")
nw3.myFilterer(rdd, "field")
这三个都失败了。 NotWorking3 尤其令人惊讶。我可以做任何事情来隔离函数而不是尝试序列化整个对象(我相信这无论如何都会给我带来麻烦,因为我保留了对 spark 和 sql 上下文的引用)
【问题讨论】:
-
你能展示你的完整代码吗?
-
我在一个 jupyter 笔记本上运行这个,上面的两行在一个单元格中被打破了,我唯一没有定义的是 rdd,它是一个包含字段 "的 RDD[Row]字段”。
-
我不太了解 Jupyter,但我的猜测是 fn 被包裹在另一个不可序列化的父对象中(通过 Jupyter)
-
这是否意味着您认为这应该有效?我也在 Jupyter 之外尝试过,将 fn 的定义保留在方法中(而不是作为参数),我得到了同样的错误,即使我在 REPL 中运行它
-
Spark 很好地指出了哪个变量是不可序列化的。您可以发布完整的异常消息吗?
标签: scala serialization apache-spark rdd