【问题标题】:How does Spark serialization work for case classes?Spark 序列化如何用于案例类?
【发布时间】:2018-10-09 16:26:33
【问题描述】:

我在 Spark 2.2 中遇到了一些奇怪的问题,以及它如何反序列化案例类。对于这些示例,假设这个案例类:

case class X(a:Int, b:Int) {
  println("in the constructor!!!")
}

如果我有以下映射操作,我会在执行程序日志中看到我的构造函数和“a”消息的值。

ds.map(x => {
  val x = X(1, 2)
  println(s"a=${x.a})
}

通过以下映射操作,我看不到我的构造函数消息,但我确实在执行程序日志中看到了“a”消息的值。构造函数消息在驱动程序日志中。

val x = X(1, 2)
ds.map(x => println(s"a=${x.a}"))

如果我使用广播变量,我会得到相同的行为。

val xBcast = sc.broadcast(X(1, 2))
ds.map(x => println(s"a=${xBcast.value.a}"))

知道发生了什么吗? Spark 是否根据需要对每个字段进行序列化?我本来希望整个对象被运送过来并反序列化。通过这种反序列化,我希望有一个构造函数调用。

当我查看 Products 的编码器代码时,它看起来像是从构造函数中获取了必要的字段。我想我假设它会使用这些编码器来处理这类事情。

我什至反编译了我的案例类的类文件,生成的构造函数看起来很合理。

【问题讨论】:

  • 为什么你在驱动程序中定义一个对象为val x,但又将ds中的元素引用为x?我很惊讶它完全有效,除非你的 ds 中的元素也有一个字段'a'。我会在ds.map(_ => println(s"a=...")) 或更清楚地ds.foreach(_ => println(s"a=..")) 上工作,因为无论如何你的输出类型都是 Unit 。最后...尝试使用val x vs def x vs lazy val x。我敢打赌'val'的构造函数在驱动程序中,因为'def'在执行程序中,并且不确定lazy val最终会在哪里......可能也是驱动程序,但这是一个有趣的练习。
  • 返回类型为 Unit 在我的示例中更加懒惰。假设我正在输出实际值。我专注于展示我如何使用该对象。它找到 x 是因为您可以在 scala 的内部范围中引用外部范围变量。加上 spark 知道如何解释这一点。如果您阅读广播变量,我所拥有的就是您应该做的。使用广播和不使用广播的唯一区别是 Spark 在不使用广播重新序列化方面会更聪明。
  • 你能提供一些细节吗?你如何测试这段代码?在什么模式下? ds的类型是什么,是如何创建的?
  • Ds 是一个数据集。它是使用“session.read.parquet(file).as[CaseClass]”模式创建的。我在集群上以纱线/集群模式测试它。

标签: scala apache-spark


【解决方案1】:

Spark 默认使用 Java 序列化(可用因为案例类扩展 Serializable),它不需要使用构造函数来反序列化。见this StackOverflow question for details on Java serialization/deserialization

请注意,这种对 Java 序列化的依赖可能会导致问题,因为内部序列化格式不是一成不变的,因此 JVM 版本差异可能会导致反序列化失败。

【讨论】:

  • 这听起来不对。即使使用 RDD API,Spark 也不一定使用 Java 序列化,使用 Dataset API 它根本不应该使用它进行数据序列化。
  • 使用 Java 序列化似乎是正确的。 scala @transient 注解,它与 Java 序列化一起使用,正确地使字段不被序列化。
猜你喜欢
  • 1970-01-01
  • 2015-08-10
  • 2020-04-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-03-29
  • 2019-09-23
  • 2016-01-15
相关资源
最近更新 更多