【发布时间】:2016-04-09 10:12:06
【问题描述】:
我正在尝试将一些 Hadoop Map Reduce 代码迁移到 Spark,当键或值的架构从输入更改为输出时,我对如何管理 map 和 reduce 转换存有疑问。
我有 avro 文件,其中包含我想以某种方式处理的 Indicator 记录。我已经有了这个有效的代码:
val myAvroJob = new Job()
myAvroJob.setInputFormatClass(classOf[AvroKeyInputFormat[Indicator]])
myAvroJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[Indicator]])
myAvroJob.setOutputValueClass(classOf[NullWritable])
AvroJob.setInputValueSchema(myAvroJob, Schema.create(Schema.Type.NULL))
AvroJob.setInputKeySchema(myAvroJob, Indicator.SCHEMA$)
AvroJob.setOutputKeySchema(myAvroJob, Indicator.SCHEMA$)
val indicatorsRdd = sc.newAPIHadoopRDD(myAvroJob.getConfiguration,
classOf[AvroKeyInputFormat[Indicator]],
classOf[AvroKey[Indicator]],
classOf[NullWritable])
val myRecordOnlyRdd = indicatorsRdd.map(x => (doSomethingWith(x._1), NullWritable.get)
val indicatorPairRDD = new PairRDDFunctions(myRecordOnlyRdd)
indicatorPairRDD.saveAsNewAPIHadoopDataset(myAvroJob.getConfiguration)
但此代码有效,因为输入和输出键的架构不会改变,始终是 Indicator。在 hadoop Map Reduce 中,您可以定义 map 或 reduce 函数并修改从输入到输出的模式。事实上,我有处理每一个Indicator 记录并生成一个新记录SoporteCartera 的地图函数。我怎样才能在火花中做到这一点?有可能来自同一个 RDD,还是我必须定义 2 个不同的 RDD 并以某种方式从一个传递到另一个?
感谢您的帮助。
【问题讨论】:
标签: dictionary apache-spark schema rdd avro