【问题标题】:How to read avro file in spark using newAPIHadoopFile?如何使用 newAPIHadoopFile 在 spark 中读取 avro 文件?
【发布时间】:2016-09-13 22:52:53
【问题描述】:

我正在尝试在 spark 作业中读取 na Avro 文件。
我的 spark 版本是 1.6.0 (spark-core_2.10-1.6.0-cdh5.7.1)。

这是我的java代码:

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("ReadAvro"));
JavaPairRDD <NullWritable, Text> lines = sc.newAPIHadoopFile(args[0],AvroKeyValueInputFormat.class,AvroKey.class,AvroValue.class,new Configuration());

但是我得到一个编译时异常:

方法newAPIHadoopFile(String, Class, Class, Class, JavaSparkContext 类型中的配置)不适用于 参数(字符串、类、类、 类,配置)

那么在Java中使用JavaSparkContext.newAPIHadoopFile()的正确方法是什么?

【问题讨论】:

    标签: java hadoop apache-spark


    【解决方案1】:
    public class Utils {
    
      public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) {
        JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration());
        return records.keys()
            .map(x -> (GenericRecord) x.datum())
            .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value")));
      }
    }
    

    将该实用程序用作:

    JavaPairRDD<String, YourAvroClassName> records = Utils.<YourAvroClassName>loadAvroFile(sc, inputDir);
    

    您可能还需要使用 KryoSerializer 并注册您的自定义 KryoRegistrator:

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    sparkConf.set("spark.kryo.registrator", "com.test.avro.MyKryoRegistrator");
    
    public class MyKryoRegistrator implements KryoRegistrator {
    
      public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer {
        Class<T> type;
        public SpecificInstanceCollectionSerializer(Class<T> type) {
          this.type = type;
        }
    
        @Override
        protected Collection create(Kryo kryo, Input input, Class<Collection> type) {
          return kryo.newInstance(this.type);
        }
    
        @Override
        protected Collection createCopy(Kryo kryo, Collection original) {
          return kryo.newInstance(this.type);
        }
      }
    
    
      Logger logger = LoggerFactory.getLogger(this.getClass());
    
      @Override
      public void registerClasses(Kryo kryo) {
        // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
        // because Kryo is not able to serialize them properly, we use this serializer for them
        kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer<>(ArrayList.class));
        kryo.register(YourAvroClassName.class);
      }
    }
    

    希望这会有所帮助...

    【讨论】:

    • Utils 类中仍然存在相同的编译时异常。
    猜你喜欢
    • 1970-01-01
    • 2018-01-03
    • 2019-05-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多