【问题标题】:Read Avro with Spark in java在 java 中使用 Spark 阅读 Avro
【发布时间】:2016-05-02 04:28:21
【问题描述】:

有人可以分享在 spark 中使用 java 读取 avro 的示例吗? 找到了 scala 示例,但没有使用 java 的运气。 这是代码 sn-p,它是代码的一部分,但在使用方法 ctx.newAPIHadoopFile 时遇到了编译问题。

JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
JavaRDD<SampleAvro> lines = ctx.newAPIHadoopFile(path, AvroInputFormat.class, AvroKey.class, NullWritable.class, new Configuration());

问候

【问题讨论】:

  • 您能否分享更多有关您遇到的编译问题的信息?错误、堆栈跟踪等
  • 它给出的编译错误说预期的 java.lang.class 实际是 AvroInputFormat.class 并且对于除 path,hadoopConf 之外的其余参数相同。在我出错的地方有什么帮助吗?谢谢

标签: java apache-spark avro


【解决方案1】:

您可以使用 Databricks 的 spark-avro 连接器库
从 Spark SQL 读取或写入 Avro 数据的推荐方法是使用 Spark 的 DataFrame API。

连接器支持从 Spark SQL 读取和写入 Avro 数据:

import org.apache.spark.sql.*;

SQLContext sqlContext = new SQLContext(sc);

// Creates a DataFrame from a specified file
DataFrame df = sqlContext.read().format("com.databricks.spark.avro")
    .load("src/test/resources/episodes.avro");

// Saves the subset of the Avro records read in
df.filter($"age > 5").write()
    .format("com.databricks.spark.avro")
    .save("/tmp/output");

请注意,此连接器具有适用于 Spark 1.2、1.3 和 1.4+ 的不同版本:

Spark 版本连接器
1.2         0.2.0      
1.3          1.0.0      
1.4+        2.0.1      

使用 Maven:

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.10</artifactId>
    <version>{AVRO_CONNECTOR_VERSION}</version>
</dependency>

查看更多信息:Spark SQL Avro Library

【讨论】:

  • 知道如何通过 Java 中的 Hadoop InputFormat API 做到这一点吗?
  • @leet_Falcon,谢谢,我已经用 Avro 尝试过 Spark SQL,但到目前为止还没有运气。以下错误消息我在线程“main”java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.read()Lorg/apache/spark/sql/DataFrameReader 中得到了异常;在 org.opencb.hpg.bigdata.tools.sparkanalytics.SaprkSQLAvro.main(SaprkSQLAvro.java:19)。
  • @kre - 你使用 Spark 1.4+ 吗?
【解决方案2】:

在这里,假设 K 是你的 Key 而 V 是你的 value:

....

val job = new Job();

job.setInputFormatClass(AvroKeyValueInputFormat<K, V>.class);

FileInputFormat.addInputPaths(job, <inputPaths>);
AvroJob.setInputKeySchema(job, <keySchema>);
AvroJob.setInputValueSchema(job, <valueSchema>);

RDD<AvroKey<K>, AvroValue<V>> avroRDD = 
  sc.newAPIHadoopRDD(job.getConfiguration,
  AvroKeyValueInputFormat<<K>, <V>>,
  AvroKey<K>.class,
  AvroValue<V>.class);

【讨论】:

    猜你喜欢
    • 2015-10-31
    • 1970-01-01
    • 2015-12-22
    • 1970-01-01
    • 1970-01-01
    • 2018-11-24
    • 2017-04-18
    • 2018-01-03
    • 1970-01-01
    相关资源
    最近更新 更多