【发布时间】:2015-10-08 23:59:16
【问题描述】:
我有一段用 spark 编写的代码,可以将数据从 HDFS 加载到从 avro idl 生成的 java 类中。在以这种方式创建的 RDD 上,我正在执行简单的操作,其结果取决于我是否在它之前缓存 RDD 即如果我在下面运行代码
val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 200000
程序将打印 200000,另一方面执行下一个代码
val loadedData = loadFromHDFS[Data](path,...).cache()
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 1
导致 1 打印到标准输出。
当我在读取缓存数据后检查字段的值时,似乎
我很确定所描述问题的根本原因是从 avro idl 生成的类的序列化问题,但我不知道如何解决它。我尝试使用 Kryo,注册生成的类(数据),为给定的类(SpecificRecordSerializer、SpecificRecordBinarySerializer 等)注册来自 chill_avro 的不同序列化程序,但这些想法都没有帮助我。
我该如何解决这个问题?
Link 到最小、完整和可验证的示例。
【问题讨论】:
-
我遇到了同样的问题。你有没有进一步了解它?这是一个已知问题吗?如果没有,也许应该向 Spark 报告?
-
要使其工作,您需要创建已加载数据的深层副本,即
loadFromHDFS[Data](path,...).map(Data.newBuilder(_).build()).cache()将完成这项工作
标签: serialization apache-spark avro