【发布时间】:2016-09-15 00:52:17
【问题描述】:
我注意到,当我在 DataFrame 上使用 Window 函数后,如果我用函数调用 map(),Spark 会返回“Task not serializable”异常 这是我的代码:
val hc:org.apache.spark.sql.hive.HiveContext =
new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f() : String = "test"
case class P(name: String, surname: String)
val lag_result: org.apache.spark.sql.Column =
lag($"name",1).over(Window.partitionBy($"surname"))
val lista: List[P] = List(P("N1","S1"), P("N2","S2"), P("N2","S2"))
val data_frame: org.apache.spark.sql.DataFrame =
hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
// This works
// df.withColumn("lag_result", lag_result).map{ case x =>
// def f():String = "test";f}.collect
这是堆栈跟踪:
org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 在... 以及更多原因:java.io.NotSerializableException: org.apache.spark.sql.Column 序列化栈:
- 对象不可序列化(类:org.apache.spark.sql.Column,值:'lag(name,1,null) windowspecdefinition(surname,UnspecifiedFrame))
- 字段(类:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, 名称:lag_result,类型:类 org.apache.spark.sql.Column) ...和 更多
【问题讨论】:
-
能否在您的 vals 中添加类型注释
-
我已经编辑了代码! ;)
-
lag() 案例类的定义在哪里?
-
滞后是 Spark 的解析函数
标签: scala apache-spark serialization apache-spark-sql window-functions