【问题标题】:AvroMultipleInputs - Issue adding multiple pathsAvroMultipleInputs - 添加多个路径的问题
【发布时间】:2016-05-05 21:44:15
【问题描述】:

这是我用来添加多个具有不同映射器类的 Avro 输入路径的驱动程序代码 sn-p

AvroMultipleInputs.addInputPath(jobConf, new Path(args[0]), IncrementalDataMapper.class, incrSchema);
AvroMultipleInputs.addInputPath(jobConf, new Path(args[1]), BaseDataMapper.class, incrSchema);

AvroJob.setMapOutputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema));

AvroJob.setReducerClass(jobConf, DeltaCaptureReducer.class);
AvroJob.setInputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema));
AvroJob.setOutputSchema(jobConf, incrSchema);

当我运行这个驱动程序时,我得到了来自 AvroMultipleInputs 的 getInputSchemaMap(...) 方法的以下异常

java.lang.RuntimeException: org.apache.avro.SchemaParseException: 无法重新定义:com.sample.Test

现在我所做的是在独立程序中模拟 AvroMultipleInputs 的方法 getInputSchemaMap(...) 以产生相同的问题。

独立代码

失败的代码,

    Schema.Parser schemaParser = new Schema.Parser();
    String m1 = "path1;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }");
    String m2 = "path2;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }");
    String[] schemaMappings = (m1 + "," + m2).split(",");
    for (String schemaMapping : schemaMappings) {
        String[] split = schemaMapping.split(";");
        String schemaString = fromBase64(split[1]);
        System.out.println(schemaString);
        Schema inputSchema;
        try {
            inputSchema = schemaParser.parse(schemaString);
        } catch (SchemaParseException e) {
            throw new RuntimeException(e);
        }
    }

现在我通过为每个映射创建解析器来解决这个问题,如下所示。

for (String schemaMapping : schemaMappings) {
        String[] split = schemaMapping.split(";");
        String schemaString = fromBase64(split[1]);
        System.out.println(schemaString);
        Schema inputSchema;
        try {
            Schema.Parser schemaParser = new Schema.Parser();
            inputSchema = schemaParser.parse(schemaString);
        } catch (SchemaParseException e) {
                throw new RuntimeException(e);
        }
}

有人试过吗?有什么想法可以解决吗?

我还尝试将 AvroMultipleInputs 复制到我的项目中并更改代码以使用上述不同的解析器,但我得到以下异常

线程“main”中的异常 java.lang.NullPointerException 在 org.apache.hadoop.mapred.lib.MultipleInputs.getInputFormatMap(MultipleInputs.java:93) 在 org.apache.hadoop.mapred.lib.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:55) 在 org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:328) 在 org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320) 在 org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196) 在 org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)

【问题讨论】:

    标签: java hadoop mapreduce avro


    【解决方案1】:

    事实上,我必须自定义更多文件才能使其正常工作。我仍然不确定是否有其他影响(我不知道我不知道什么)

    AvroMultipleInputs.java
    DelegatingInputFormat.java

    DelegatingMapper.java
    MapCollector.java
    TaggedInputSplit.java

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-02-03
      • 1970-01-01
      • 2021-10-23
      • 2017-05-19
      • 1970-01-01
      • 1970-01-01
      • 2013-03-12
      • 2017-08-25
      相关资源
      最近更新 更多