【问题标题】:hadoop 2.6.0 and avrohadoop 2.6.0 和 avro
【发布时间】:2015-11-09 16:59:23
【问题描述】:

我正在尝试使用 avro 运行 map reduce,但在尝试了网上人们建议的所有选项后,我仍然无法通过 -

15/11/09 21:54:48 警告 mapred.LocalJobRunner: job_local1421922570_0001 java.lang.Exception: java.lang.NoSuchMethodError: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;

以下是代码(来自 Hadoop 权威指南) public class AvroGenericMapTemperature extends Configured implements Tool {

private static final Schema SCHEMA = new Schema.Parser().parse(
    "{" +
    " \"type\" : \"record\"," +
    " \"name\" : \"WeatherRecord\"," +
    " \"doc\" : \"A weather reading\"," +
    " \"fields\": [" +
    "    {\"name\": \"year\", \"type\": \"int\" }," +
    "    {\"name\": \"temperature\", \"type\": \"int\" }," +
    "    {\"name\": \"stationId\", \"type\": \"string\" }" +
    "  ]" +
    "}"
);

public static class MaxTemperatureMapper extends 
    Mapper<LongWritable,  Text, 
    AvroKey<Integer>,AvroValue<GenericRecord> > {
    private NcdcRecordParser parser = new NcdcRecordParser();
    private GenericRecord record = new GenericData.Record(SCHEMA);

    @Override
    protected void map(
        LongWritable key,
        Text value,
        Mapper<LongWritable, Text, AvroKey<Integer>, 
        AvroValue<GenericRecord>>.Context context)
        throws IOException, InterruptedException {
        parser.parse(value.toString());

        if( parser.isValidTemperature() ) {
            record.put("year", parser.getYearInt());
            record.put("temperature", parser.getAirTemperature());
            record.put("stationId", parser.getStationId());
            context.write(new AvroKey<Integer>(parser.getYearInt()), 
            new AvroValue<GenericRecord>(record));
        }
    }
}

public static class MaxTemperatureReducer extends 
Reducer<AvroKey<Integer>, AvroKey<GenericRecord>, 
AvroKey<GenericRecord>, NullWritable> {

    @Override
    protected void reduce(
        AvroKey<Integer> key,
        Iterable<AvroKey<GenericRecord>> values,
        Reducer<AvroKey<Integer>, AvroKey<GenericRecord>, 
        AvroKey<GenericRecord>, NullWritable>.Context context)
        throws IOException, InterruptedException {
        GenericRecord max = null;

        for ( AvroKey<GenericRecord> value : values) {
            GenericRecord record = value.datum();
            if ( max == null ||
                 (Integer)record.get("temperature") > (Integer) 
                 max.get("termperature") ) {
                 max = newWeatherRecord(record);
            }
        }
        context.write(new AvroKey<GenericRecord>(max), 
        NullWritable.get());
    }

    private GenericRecord newWeatherRecord(GenericRecord value) {
        GenericRecord record = new GenericData.Record(SCHEMA);
        record.put("year", value.get("year"));
        record.put("temperature", value.get("temperature"));
        record.put("stationId", value.get("stationId"));
        return record;
    }
}

public int run(String[] args) throws Exception {
    // TODO Auto-generated method stub
    Job job = new Job( getConf(), "Avro mapreduce");
    job.setJarByClass(getClass());


    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));
    AvroJob.setMapOutputValueSchema(job, SCHEMA);
    AvroJob.setOutputKeySchema(job, SCHEMA);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    job.setMapperClass(MaxTemperatureMapper.class);
    job.setReducerClass(MaxTemperatureReducer.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    int exitcode = 
        ToolRunner.run(new AvroGenericMapTemperature(), args);
}

}; 我已经在 HADOOP_CLASSPATH 中明确设置了所有 avro jars(1.7.5 版本的 avro、avro-mapred、avro-tools 等),并且在运行上述内容时还指定了 -D mapreduce.job.user.classpath.first=true,但我不断收到同样的错误....我知道使用 hadoop 2.6.0 的默认 avro 是 1.7.4,我什至尝试了 avro 的那个版本,但没有运气....任何帮助都会很大

【问题讨论】:

    标签: hadoop avro


    【解决方案1】:

    Reducer的通用参数如下:org.apache.hadoop.mapreduce.Reducer

    第一个和第二个分别是KeyIn和Value in。 所以你可以尝试改变 公共静态类 MaxTemperatureReducer 扩展 减速机,AvroKey, AvroKey,NullWritable> {

    @Override
    protected void reduce(
        AvroKey<Integer> key,
        Iterable<AvroKey<GenericRecord>> values,
        Reducer<AvroKey<Integer>, AvroKey<GenericRecord>, 
        AvroKey<GenericRecord>, NullWritable>.Context context)
        throws IOException, InterruptedException {
    

    到 公共静态类 MaxTemperatureReducer 扩展 减速机,AvroValue, AvroKey,NullWritable> {

    @Override
    protected void reduce(
        AvroKey<Integer> key,
        Iterable<AvroKey<GenericRecord>> values,
        Reducer<AvroKey<Integer>, AvroValue<GenericRecord>, 
        AvroKey<GenericRecord>, NullWritable>.Context context)
        throws IOException, InterruptedException {
    

    由于在映射器中您将 AvroKey 编写为 Key 并将 AvroValue 编写为 Value。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-04-25
      • 2015-04-02
      • 2015-06-14
      相关资源
      最近更新 更多