【问题标题】:Spark not serializable issueSpark不可序列化问题
【发布时间】:2017-02-13 17:04:27
【问题描述】:

我正在重构我们的代码,以便我们可以将 CAKE 模式用于 DI。

我偶然发现了一个难以理解的序列化问题。

当我调用这个函数时:

def getAccounts(winZones: Broadcast[List[WindowsZones]]): RDD[AccountDetails] = {
  val accounts = getAccounts //call to db

  val result = accounts.map(row =>
    Some(AccountDetails(UUID.fromString(row.getAs[String]("")),
      row.getAs[String](""),
      UUID.fromString(row.getAs[String]("")),
      row.getAs[String](""),
      row.getAs[String](""),
      DateUtils.getIanaZoneFromWinZone(row.getAs[String]("timeZone"), winZones))))
    .map(m=>m.get)
  result
}

它工作得很好,但这很难看,我想重构它,以便从 row 到 AccountDetails 的中间映射放在私有函数中 - 但是这样做会导致序列化问题。

我想要:

def getAccounts(winZones: Broadcast[List[WindowsZones]]): RDD[AccountDetails] = {
  val accounts = getAccounts 

  val result = accounts
    .map(m => getAccountDetails(m, winZones))
    .filter(_.isDefined)
    .map(m => m.get)
  result
}

private def getAccountDetails(row: Row, winZones: Broadcast[List[WindowsZones]]): Option[AccountDetails] = {
      try {
        Some(AccountDetails(UUID.fromString(""),
          row.getAs[String](""),
          UUID.fromString(row.getAs[String]("")),
          row.getAs[String](""),
          row.getAs[String](""),
          DateUtils.getIanaZoneFromWinZone(row.getAs[String]("timeZone"), winZones)))
      }
      catch {
        case e: Exception =>
          logger.error(s"Unable to set AccountDetails $e")
          None
      }
    }

当然,感谢任何帮助,AccountDetails obj 是一个案例类,应该是相关的。也很高兴接受任何其他关于使用 spark 实现 cake 或 DI 的建议。谢谢。

编辑以显示结构:

trait serviceImpl extends anotherComponent {this: DBA =>
  def Accounts = new Accounts
  class Accounts extends AccountService {
    //the methods above are defined here.
  }

编辑以包含堆栈跟踪:

    17/02/13 17:32:32 INFO CodeGenerator: Code generated in 271.36617 ms
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)
    at FunnelServiceComponentImpl$FunnelAccounts.getAccounts(FunnelServiceComponentImpl.scala:24)
    at Main$.delayedEndpoint$Main$1(Main.scala:26)
    at Main$delayedInit$body.apply(Main.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at Main$.main(Main.scala:7)
    at Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: FunnelServiceComponentImpl$FunnelAccounts
Serialization stack:
    - object not serializable (class: FunnelServiceComponentImpl$FunnelAccounts, value: FunnelServiceComponentImpl$FunnelAccounts@16b7e04a)
    - field (class: FunnelServiceComponentImpl$FunnelAccounts$$anonfun$1, name: $outer, type: class FunnelServiceComponentImpl$FunnelAccounts)
    - object (class FunnelServiceComponentImpl$FunnelAccounts$$anonfun$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 26 more
17/02/13 17:32:32 INFO SparkContext: Invoking stop() from shutdown hook

【问题讨论】:

    标签: scala apache-spark serialization dependency-injection


    【解决方案1】:

    你在哪里定义函数?

    假设您在 X 类中定义它们。如果该类不可序列化,这将导致您的问题。

    要解决这个问题,您可以将其设为对象或将类设为可序列化。

    【讨论】:

    • 这两个函数都在同一个类中——我已经使类扩展为可串行化,但它根本没有帮助。把它变成一个对象会让它恢复到以前的样子,但是我失去了整合 DI 的能力。如果这有助于解释我的困境,我将遵循蛋糕方法 - 我已经用我的班级示例结构编辑了这篇文章。
    • 你有序列化栈吗?应该是 NotSerializableException 堆栈跟踪的一部分。
    • @JoeC - 嗨,乔,我已经编辑了问题以包含堆栈跟踪 - 我尝试使用 Serialise 扩展受影响的类,然后将它们全部扩展 - 恐怕没有效果。跨度>
    【解决方案2】:

    因为 getAccountDetails 在您的班级中,Spark 会想要序列化您的整个 FunnelAccounts 对象。毕竟,您需要一个实例才能使用此方法。但是,FunnelAccounts 不可序列化。因此它不能发送给工人。

    在您的情况下,您应该将 getAccountDetails 移动到 FunnelAccounts 对象中,这样您就不需要实例 FunnelAccounts 来运行它。

    【讨论】:

    • 嗨,乔,谢谢。你当然是对的,我将函数提取到一个对象中,该对象驻留在同一个文件 jsut 中进行测试,它工作正常。我现在担心的是我离我试图遵循的架构模式太远了:如果我保持原样,那么我认为我将拥有比必要更多的“层”。通过创建一个对象,我正在减少类的效用。虽然我认为它会变成一个纯粹的“getObjectTypeFromSDb”,然后将对象转到“retrieveObjectValuesFromRow”,然后再使用它——感觉有点太细了。
    • 对不起 - 字符用完了 - 想问你是否有任何使用 Spark 项目实现 DI 模式的经验 - 没有 UI 可言,所以没有视图。在存储值之前只是 DB 和操作
    猜你喜欢
    • 1970-01-01
    • 2018-07-05
    • 2016-10-09
    • 2017-02-04
    • 2018-04-08
    • 2018-04-06
    • 2021-03-16
    • 2019-06-25
    • 1970-01-01
    相关资源
    最近更新 更多