【问题标题】:How can I read this avro file using spark & scala?如何使用 spark 和 scala 读取这个 avro 文件?
【发布时间】:2015-02-07 23:40:29
【问题描述】:

我见过各种 spark 和 avro 问题(包括 How can I load Avros in Spark using the schema on-board the Avro file(s)?),但以下 avro 文件没有一个适合我的解决方案:

http://www.4shared.com/file/SxnYcdgJce/sample.html

当我尝试使用上述解决方案读取 avro 文件时,我收到关于它不可序列化的错误(spark java.io.NotSerializableException: org.apache.avro.mapred.AvroWrapper)。

如何设置 spark 1.1.0(使用 scala)来读取这个示例 avro 文件?

-- 更新--

我已将其移至邮件列表:http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-read-this-avro-file-using-spark-amp-scala-td19400.html

【问题讨论】:

  • 你是直接收集的吗?您应该将它们映射到一些 Serializable 类,因为它们不能被序列化。

标签: scala apache-spark


【解决方案1】:

我在尝试读取 Avro 文件时遇到了同样的问题。原因是 AvroWrapper 没有实现java.io.Serializable 接口。

解决方案是使用org.apache.spark.serializer.KryoSerializer

import org.apache.spark.SparkConf

val cfg = new SparkConf().setAppName("MySparkJob")
cfg.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
cfg.set("spark.kryo.registrator", "com.stackoverflow.Registrator")

但这还不够,因为我在 Avro 文件中的类也没有实现 Serializable

因此,我添加了自己的注册器,扩展了KryoRegistrator,并包含了chill-avro 库。

class Registrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[MyClassInAvroFile], AvroSerializer.SpecificRecordBinarySerializer[MyClassInAvroFile])
    kryo.register(classOf[AnotherClassInAvroFile], AvroSerializer.SpecificRecordBinarySerializer[AnotherClassInAvroFile])
  }
}

然后我能够读取这样的文件:

ctx.hadoopFile("/path/to/the/avro/file.avro",
  classOf[AvroInputFormat[MyClassInAvroFile]],
  classOf[AvroWrapper[MyClassInAvroFile]],
  classOf[NullWritable]
).map(_._1.datum())

【讨论】:

  • 感谢您的分享!为什么我总是得到Exception in thread "main" java.lang.NoClassDefFoundError: com/twitter/chill/KryoInstantiator
  • 您是否添加了依赖项并将其包含在最终工件中?
  • 够有趣的。我试过包com.twitter" % "chill-java" % "0.7.0",但它不起作用。只有0.7.1 版本有效。无论如何,您的解决方案是唯一有效的解决方案。谢谢!
【解决方案2】:

将序列化程序编辑为 kryo 应该可以解决问题。

一种方法是注释掉 /etc/spark/conf/spark-defaults.conf 中的行:

spark.serializer org.apache.spark.serializer.KryoSerializer

【讨论】:

    【解决方案3】:

    我的解决方案是使用我的问题中链接到的 spark 1.2 和 sparkSQL:

    val person = sqlContext.avroFile("/tmp/person.avro")
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-03
      • 2021-12-07
      • 1970-01-01
      相关资源
      最近更新 更多