【问题标题】:How to modify the avro key/value schema in a RDD map transformation如何在 RDD 映射转换中修改 avro 键/值模式
【发布时间】:2016-04-09 10:12:06
【问题描述】:

我正在尝试将一些 Hadoop Map Reduce 代码迁移到 Spark,当键或值的架构从输入更改为输出时,我对如何管理 map 和 reduce 转换存有疑问。

我有 avro 文件,其中包含我想以某种方式处理的 Indicator 记录。我已经有了这个有效的代码:

val myAvroJob = new Job()            
myAvroJob.setInputFormatClass(classOf[AvroKeyInputFormat[Indicator]])    
myAvroJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[Indicator]])       
myAvroJob.setOutputValueClass(classOf[NullWritable])

AvroJob.setInputValueSchema(myAvroJob, Schema.create(Schema.Type.NULL))
AvroJob.setInputKeySchema(myAvroJob,  Indicator.SCHEMA$)
AvroJob.setOutputKeySchema(myAvroJob, Indicator.SCHEMA$)    

val indicatorsRdd = sc.newAPIHadoopRDD(myAvroJob.getConfiguration,
  classOf[AvroKeyInputFormat[Indicator]],
  classOf[AvroKey[Indicator]],
  classOf[NullWritable]) 

val myRecordOnlyRdd = indicatorsRdd.map(x => (doSomethingWith(x._1), NullWritable.get)

val indicatorPairRDD = new PairRDDFunctions(myRecordOnlyRdd)
indicatorPairRDD.saveAsNewAPIHadoopDataset(myAvroJob.getConfiguration)

但此代码有效,因为输入和输出键的架构不会改变,始终是 Indicator。在 hadoop Map Reduce 中,您可以定义 map 或 reduce 函数并修改从输入到输出的模式。事实上,我有处理每一个Indicator 记录并生成一个新记录SoporteCartera 的地图函数。我怎样才能在火花中做到这一点?有可能来自同一个 RDD,还是我必须定义 2 个不同的 RDD 并以某种方式从一个传递到另一个?

感谢您的帮助。

【问题讨论】:

    标签: dictionary apache-spark schema rdd avro


    【解决方案1】:

    回答我自己的问题...问题是你不能改变RDD类型,你必须定义一个不同的RDD,所以我用上面的代码解决了:

    val myAvroJob = new Job()            
    myAvroJob.setInputFormatClass(classOf[AvroKeyInputFormat[SoporteCartera]])    
    myAvroJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[Indicator]])       
    myAvroJob.setOutputValueClass(classOf[NullWritable])
    
    AvroJob.setInputValueSchema(myAvroJob, Schema.create(Schema.Type.NULL))
    AvroJob.setInputKeySchema(myAvroJob,  SoporteCartera.SCHEMA$)
    AvroJob.setOutputKeySchema(myAvroJob, Indicator.SCHEMA$)
    
    val soporteCarteraRdd = sc.newAPIHadoopRDD(myAvroJob.getConfiguration,
      classOf[AvroKeyInputFormat[SoporteCartera]],
      classOf[AvroKey[SoporteCartera]],
      classOf[NullWritable])          
    
    val indicatorsRdd = soporteCarteraRdd.map(x => (fromSoporteCarteraToIndicator(x._1), NullWritable.get))
    
    val indicatorPairRDD = new PairRDDFunctions(indicatorsRdd)
    indicatorPairRDD.saveAsNewAPIHadoopDataset(myAvroJob.getConfiguration)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-17
      • 2017-12-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多