【问题标题】:Spark issue with the class generated from avro schema从 avro 模式生成的类的 Spark 问题
【发布时间】:2015-10-08 23:59:16
【问题描述】:

我有一段用 spark 编写的代码,可以将数据从 HDFS 加载到从 avro idl 生成的 java 类中。在以这种方式创建的 RDD 上,我正在执行简单的操作,其结果取决于我是否在它之前缓存 RDD 即如果我在下面运行代码

val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 200000

程序将打印 200000,另一方面执行下一个代码

val loadedData = loadFromHDFS[Data](path,...).cache()
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 1

导致 1 打印到标准输出。

当我在读取缓存数据后检查字段的值时,似乎

我很确定所描述问题的根本原因是从 avro idl 生成的类的序列化问题,但我不知道如何解决它。我尝试使用 Kryo,注册生成的类(数据),为给定的类(SpecificRecordSerializer、SpecificRecordBinarySerializer 等)注册来自 chill_avro 的不同序列化程序,但这些想法都没有帮助我。

我该如何解决这个问题?

Link 到最小、完整和可验证的示例。

【问题讨论】:

  • 我遇到了同样的问题。你有没有进一步了解它?这是一个已知问题吗?如果没有,也许应该向 Spark 报告?
  • 要使其工作,您需要创建已加载数据的深层副本,即 loadFromHDFS[Data](path,...).map(Data.newBuilder(_).build()).cache() 将完成这项工作

标签: serialization apache-spark avro


【解决方案1】:

试试下面的代码 -

val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()).cache()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-09-24
    • 2014-08-24
    • 1970-01-01
    • 1970-01-01
    • 2020-11-26
    • 1970-01-01
    • 2020-10-27
    相关资源
    最近更新 更多