【发布时间】:2017-02-13 17:04:27
【问题描述】:
我正在重构我们的代码,以便我们可以将 CAKE 模式用于 DI。
我偶然发现了一个难以理解的序列化问题。
当我调用这个函数时:
def getAccounts(winZones: Broadcast[List[WindowsZones]]): RDD[AccountDetails] = {
val accounts = getAccounts //call to db
val result = accounts.map(row =>
Some(AccountDetails(UUID.fromString(row.getAs[String]("")),
row.getAs[String](""),
UUID.fromString(row.getAs[String]("")),
row.getAs[String](""),
row.getAs[String](""),
DateUtils.getIanaZoneFromWinZone(row.getAs[String]("timeZone"), winZones))))
.map(m=>m.get)
result
}
它工作得很好,但这很难看,我想重构它,以便从 row 到 AccountDetails 的中间映射放在私有函数中 - 但是这样做会导致序列化问题。
我想要:
def getAccounts(winZones: Broadcast[List[WindowsZones]]): RDD[AccountDetails] = {
val accounts = getAccounts
val result = accounts
.map(m => getAccountDetails(m, winZones))
.filter(_.isDefined)
.map(m => m.get)
result
}
private def getAccountDetails(row: Row, winZones: Broadcast[List[WindowsZones]]): Option[AccountDetails] = {
try {
Some(AccountDetails(UUID.fromString(""),
row.getAs[String](""),
UUID.fromString(row.getAs[String]("")),
row.getAs[String](""),
row.getAs[String](""),
DateUtils.getIanaZoneFromWinZone(row.getAs[String]("timeZone"), winZones)))
}
catch {
case e: Exception =>
logger.error(s"Unable to set AccountDetails $e")
None
}
}
当然,感谢任何帮助,AccountDetails obj 是一个案例类,应该是相关的。也很高兴接受任何其他关于使用 spark 实现 cake 或 DI 的建议。谢谢。
编辑以显示结构:
trait serviceImpl extends anotherComponent {this: DBA =>
def Accounts = new Accounts
class Accounts extends AccountService {
//the methods above are defined here.
}
编辑以包含堆栈跟踪:
17/02/13 17:32:32 INFO CodeGenerator: Code generated in 271.36617 ms
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at FunnelServiceComponentImpl$FunnelAccounts.getAccounts(FunnelServiceComponentImpl.scala:24)
at Main$.delayedEndpoint$Main$1(Main.scala:26)
at Main$delayedInit$body.apply(Main.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at Main$.main(Main.scala:7)
at Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: FunnelServiceComponentImpl$FunnelAccounts
Serialization stack:
- object not serializable (class: FunnelServiceComponentImpl$FunnelAccounts, value: FunnelServiceComponentImpl$FunnelAccounts@16b7e04a)
- field (class: FunnelServiceComponentImpl$FunnelAccounts$$anonfun$1, name: $outer, type: class FunnelServiceComponentImpl$FunnelAccounts)
- object (class FunnelServiceComponentImpl$FunnelAccounts$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 26 more
17/02/13 17:32:32 INFO SparkContext: Invoking stop() from shutdown hook
【问题讨论】:
标签: scala apache-spark serialization dependency-injection