【问题标题】:Hadoop 0.20.205.0 WritableComparator doesn't respect Configurable keysHadoop 0.20.205.0 WritableComparator 不尊重可配置键
【发布时间】:2011-12-21 01:22:14
【问题描述】:

我一直在尝试运行一个 hadoop 0.20.205.0 MapReduce 作业(单线程,本地),它表现出各种奇怪和意外的行为。我终于明白为什么了。在我看来,这就像 hadoop 中的一个错误,但也许有一些我不明白的地方。有人可以给我一些建议吗? 我的 setMapOutputKeyClass 类实现了 Configurable。除非首先调用 setConf,否则 readFields 方法将无法正确读取(我相信这是 Configurable 接口的重点) 但是查看 WritableComparator 的代码,我发现当框架对它们进行排序时,它会实例化其内部关键对象:

70      key1 = newKey();
71      key2 = newKey();

而 newKey() 使用 null 配置来构造键:

83  public WritableComparable newKey() {
84    return ReflectionUtils.newInstance(keyClass, null);
85  }

确实,当我在调试器中运行时,我发现在

91      key1.readFields(buffer);

key1 中的conf 为空,所以没有调用setConf。

这是 hadoop 中的错误还是我应该使用 Configurable 以外的东西来配置密钥? 如果这是一个错误,有人知道任何解决方法吗?

编辑:这是一个因这个原因而失败的工作的简短(有点做作)示例:

// example/WrapperKey.java

package example;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * This class wraps a WritableComparable class to add one extra possible value
 * (namely null) to the range of values available for that class.
 */
public class WrapperKey<T extends WritableComparable> implements
        WritableComparable<WrapperKey<T>>, Configurable {
    private T myInstance;
    private boolean isNull;
    private Configuration conf;

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
        Class<T> heldClass = (Class<T>) conf.getClass("example.held.class",
                null, WritableComparable.class);
        myInstance = ReflectionUtils.newInstance(heldClass, conf);
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeBoolean(isNull);
        if (!isNull)
            myInstance.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        isNull = in.readBoolean();
        if (!isNull)
            myInstance.readFields(in);
    }

    @Override
    public int compareTo(WrapperKey<T> o) {
        if (isNull) {
            if (o.isNull)
                return 0;
            else
                return -1;
        } else if (o.isNull)
            return 1;
        else
            return myInstance.compareTo(o.myInstance);
    }

    public void clear() {
        isNull = true;
    }

    public T get() {
        return myInstance;
    }

    /**
     * Should sort the KV pairs (5,0), (3,0), and (null,0) to [(null,0), (3,0), (5,0)], but instead fails
     * with a NullPointerException because WritableComparator's internal keys
     * are not properly configured
     */
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.setClass("example.held.class", ByteWritable.class,
                WritableComparable.class);
        Path p = new Path("input");
        Path startFile = new Path(p, "inputFile");
        SequenceFile.Writer writer = new SequenceFile.Writer(
                p.getFileSystem(conf), conf, startFile, WrapperKey.class,
                ByteWritable.class);
        WrapperKey<ByteWritable> key = new WrapperKey<ByteWritable>();
        key.setConf(conf);
        ByteWritable value = new ByteWritable((byte) 0);
        key.get().set((byte) 5);
        writer.append(key, value);
        key.get().set((byte) 3);
        writer.append(key, value);
        key.clear();
        writer.append(key, value);
        writer.close();

        Job j = new Job(conf, "Example job");
        j.setInputFormatClass(SequenceFileInputFormat.class);
        j.setOutputKeyClass(WrapperKey.class);
        j.setOutputValueClass(ByteWritable.class);
        j.setOutputFormatClass(SequenceFileOutputFormat.class);
        FileInputFormat.setInputPaths(j, p);
        FileOutputFormat.setOutputPath(j, new Path("output"));
        boolean completed = j.waitForCompletion(true);
        if (completed) {
            System.out
                    .println("Successfully sorted byte-pairs by key (putting all null pairs first)");
        } else {
            throw new RuntimeException("Failed to sort");
        }
    }
}

【问题讨论】:

  • 为什么要实现Configurable?
  • 键是游戏的棋盘状态。我需要指定正在解决的游戏的宽度和高度。然后读取的字节数是宽度*高度(板上每个单元格一个)。我意识到我可以只传递宽度和高度,但这不是一个通用的解决方案。例如,假设我的键实际上具有通用类型,并且它们可能包含的实例的类取决于某些配置参数。那么就没有办法有效地读入和解析每次调用 readFields 的类名。我应该期望每个实例只需要知道一次
  • 键需要是 WritableComparable 以便它们可以写入 HDFS 并由 Hadoop 排序以输入到 reduce 阶段。 Hadoop 在工作时将使用 WritableComparable 方法。它将创建它们的新实例,但没有理由查看它们是否可配置并调用 setConf()。 Configurable 适用于作业配置类,而不适用于您在代码中使用的任何任意类。
  • 嘿,你能告诉我你是怎么解决的吗?我会很感激的。

标签: null hadoop key configurable writable


【解决方案1】:

WrapperKey 正在实现 Configurable 并实现 setConf。仅仅实现一个接口并不意味着其他一些类会调用它。 Hadoop 框架可能没有在键上调用 setConf 方法。

我不认为这是一个错误。我见过的所有类型都只实现了 WritableComparable 而不是 Configurable。不确定是否有解决方法,您可能必须在键中定义具体类型。

【讨论】:

  • 我的印象是 Configurable 是 hadoop 让您将配置选项传递给键的方式,就像在构造函数中一样(因为它们是通过反射实例化的)。这就是 ReflectionUtils.newInstance 将配置作为参数的原因
  • 你能指出我使用 RefectionUtils.newInstance() 实例化键的代码吗?
  • 它在 ReflectionUtils.newInstance(Class, Configuration) 中
猜你喜欢
  • 1970-01-01
  • 2019-03-10
  • 1970-01-01
  • 1970-01-01
  • 2013-06-05
  • 2018-05-28
  • 2013-10-01
  • 1970-01-01
  • 2017-12-10
相关资源
最近更新 更多