【问题标题】:Custom File Format自定义文件格式
【发布时间】:2015-10-27 13:33:03
【问题描述】:

我是自定义文件格式的新手。我正在尝试通过
MapReduce 执行 WordCount 程序。我正在使用以下类
WordKey.class(以单词为键)
MyInputFormat.class
MyRecordReader.class
MyOutputFormat.class
MyRecordWriter.class
MyMapper.class
MyReducer.class
我已经以如下所示的方式实现了映射器 public class MyMapper extends Mapper&lt;<strong>WordKey</strong>,<strong>WordValue</strong>,Text,IntWritable&gt;{//rest of the business logic }


我的问题是
可以像我在上面给出的代码中编写的那样自定义映射器的输入键和输入值吗?或者其他东西可能是相同的解决方案?
请推荐
提前致谢 :)

以下是 Mapper 和 Reducer 代码
**Mapper**




  public class MyMapper extends Mapper<StudentKey, PassValue,PassValue,IntWritable> {
        public void map(StudentKey key,PassValue value,Context context)throws IOException,InterruptedException
        {


                context.write(new PassValue(value.getResStatus()),new IntWritable(1));

        }
    }


这是 reduce 方法的代码
 public class MyReducer extends
        Reducer<PassValue, IntWritable, Text, IntWritable> {
int sum=0;
public void reduce(PassValue key,Iterable<IntWritable>value,Context context)throws IOException,InterruptedException{
    for(IntWritable x:value)
    {
        sum+=x.get();
    }
    context.write(new Text(key.toString()), new IntWritable(sum));
}
}

RecordReader 的代码

public class MyRecordReader extends RecordReader<StudentKey, PassValue> {
    private StudentKey key;
    private PassValue value;
    private LineRecordReader reader=new LineRecordReader();
    @Override
    public void close() throws IOException {
        reader.close();
    }

    @Override
    public StudentKey getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public PassValue getCurrentValue() throws IOException, InterruptedException {
        // return the value
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return reader.getProgress();
    }

    @Override
    public void initialize(InputSplit is, TaskAttemptContext tac)
            throws IOException, InterruptedException {
        reader.initialize(is, tac);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        boolean nextKeyValueStatus=reader.nextKeyValue();
        if(nextKeyValueStatus)
        {
            if(key==null)
            {
                key=new StudentKey();
            }if(value==null)
            {
                value=new PassValue();
            }
            Text line=reader.getCurrentValue();
            String tokens[]=line.toString().split(",");
            key.setName(new Text(tokens[0]));
            key.setRoll(tokens[1]); //string is acceptable here because the setRoll has been customised apart from the normal 
                                    // framework structure
            value.setResStatus(tokens[2]);
        }
        return nextKeyValueStatus;
    }

}

RecordWriter 代码

public class MyRecordWriter extends RecordWriter<PassValue, IntWritable> {
    private DataOutputStream out;
    public MyRecordWriter(DataOutputStream out)
    {
        super();
        this.out=out;
    }
    public MyRecordWriter(FSDataOutputStream fileOut) {
        this.out=fileOut;
    }
    @Override
    public void close(TaskAttemptContext tac) throws IOException,
            InterruptedException {
        out.close();

    }
List<IntWritable> x=new ArrayList<IntWritable>();
    @Override
    public void write(PassValue arg0, IntWritable arg1) throws IOException,
            InterruptedException {
        x.add(arg1);
        //write key First
        out.writeBytes(arg0.toString());
        for(IntWritable y:x)
        {
            out.writeBytes(",");
            out.writeBytes(String.valueOf(y.get()));
        }
        out.writeBytes("\r\n");
    }

}

OutputFormat 的代码

  public class MyOutputFormat extends FileOutputFormat<PassValue,IntWritable> {

        @Override
        public RecordWriter<PassValue, IntWritable> getRecordWriter(
                TaskAttemptContext tac) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            Path currPath=FileOutputFormat.getOutputPath(tac);
            Path fullPath=new Path(currPath,"result.txt");

            FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
            FSDataOutputStream fileOut=fs.create(fullPath, tac);
            return new MyRecordWriter(fileOut);
        }

    }

自定义键代码

public class StudentKey implements WritableComparable<StudentKey> {
    private Text name;
    private IntWritable roll;
    public StudentKey()
    {

    }

public StudentKey(Text name, IntWritable roll) {
    super();
    this.name = name;
    this.roll = roll;
}
public StudentKey(Text name)
{
    super();
    this.name=name;
}


/**
 * @return the name
 */
public Text getName() {
    return name;
}

/**
 * @param name the name to set
 */
public void setName(Text name) {
    this.name = name;
}

/**
 * @return the roll
 */
public IntWritable getRoll() {
    return roll;
}

/**
 * @param roll the roll to set
 */
public void setRoll(IntWritable roll) {
    this.roll = roll;
}
public void setRoll(String roll) {
    this.roll = new IntWritable(Integer.parseInt(roll));
}

@Override
public void readFields(DataInput arg0) throws IOException {
    // TODO Auto-generated method stub
    this.name.readFields(arg0);
    this.roll.readFields(arg0);
}

@Override
public void write(DataOutput arg0) throws IOException {
    // TODO Auto-generated method stub
    this.name.write(arg0);
    this.roll.write(arg0);  }

/* (non-Javadoc)
 * @see java.lang.Object#hashCode()
 */
@Override
public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((name == null) ? 0 : name.hashCode());
    result = prime * result + ((roll == null) ? 0 : roll.hashCode());
    return result;
}

/* (non-Javadoc)
 * @see java.lang.Object#equals(java.lang.Object)
 */
@Override
public boolean equals(Object obj) {
    if (this == obj)
        return true;
    if (obj == null)
        return false;
    if (getClass() != obj.getClass())
        return false;
    StudentKey other = (StudentKey) obj;
    if (name == null) {
        if (other.name != null)
            return false;
    } else if (!name.equals(other.name))
        return false;
    if (roll == null) {
        if (other.roll != null)
            return false;
    } else if (!roll.equals(other.roll))
        return false;
    return true;
}

@Override
public int compareTo(StudentKey arg0) {
    int comp=this.name.compareTo(arg0.name);
    if(comp!=0)
    {
        return comp;
    }
    else return this.roll.compareTo(arg0.roll);
}

}

自定义值类

public class PassValue implements Writable {
    private Text resStatus;
    public PassValue()
    {
        resStatus=new Text();
    }
    public PassValue(Text resStatus)
    {
        super();
        this.resStatus=resStatus;
    }
    public PassValue(PassValue status)
    {
        super();
        this.resStatus=status.resStatus;
    }
    /**
     * @return the resStatus
     */
    public Text getResStatus() {
        return resStatus;
    }
    /**
     * @param resStatus the resStatus to set
     */
    public void setResStatus(Text resStatus) {
        this.resStatus = resStatus;
    }
    //String implementation
    public void setResStatus(String resStatus) {
        this.resStatus = new Text(resStatus);
    }
    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.resStatus.readFields(arg0);
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        this.resStatus.write(arg0);
    }
    /* (non-Javadoc)
     * @see java.lang.Object#hashCode()
     */
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result
                + ((resStatus == null) ? 0 : resStatus.hashCode());
        return result;
    }
    /* (non-Javadoc)
     * @see java.lang.Object#equals(java.lang.Object)
     */
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PassValue other = (PassValue) obj;
        if (resStatus == null) {
            if (other.resStatus != null)
                return false;
        } else if (!resStatus.equals(other.resStatus))
            return false;
        return true;
    }

}

驱动程序代码

public class DriverCode extends Configured implements Tool {
    static Configuration cf;
    @Override
    public int run(String[] arg0) throws IOException,InterruptedException,ClassNotFoundException {
        cf=new Configuration();
        Job j=Job.getInstance(cf);
        j.setJarByClass(DriverCode.class);
        j.setMapperClass(MyMapper.class);
        j.setMapOutputKeyClass(PassValue.class);
        j.setMapOutputValueClass(IntWritable.class);
//      j.setCombinerClass(WCReduce.class);
        j.setReducerClass(MyReducer.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(Text.class);
        j.setInputFormatClass(MyInputFormat.class);
        j.setOutputFormatClass(MyOutputFormat.class);
        Path op=new Path(arg0[1]);
        FileInputFormat.addInputPath(j, new Path(arg0[0]));
        FileOutputFormat.setOutputPath(j, op);
        op.getFileSystem(cf).delete(op, true);

        return j.waitForCompletion(true)?0:1;
    }

        public static void main(String args[])throws Exception{
            int res=ToolRunner.run(cf, new DriverCode(), args);
            System.exit(res);
        }
}

【问题讨论】:

  • 当你想到wordkey和wordvalue时,你能补充一下你的要求吗?这将有助于我们更好地指导您。
  • 实际上我正在练习自定义文件格式的概念,并且我正在使用一个文本文件,它的内容在一行中,没有换行符。通过使用供应商定义的 Box 类来计算单词出现的数量可能是一项简单的任务,但我发现自定义它很有趣,因为我在 RecordReader 部分中遇到了分隔符的问题。希望我很清楚我的要求。非常感谢您的提问:)
  • 好。当您计划自己的输入格式时,我假设您知道管理自己的拆分和记录阅读器。快乐编码

标签: hadoop mapreduce


【解决方案1】:

这是可能的,但您必须编写自己的自定义输入格式和记录阅读器类。最好的方法是让您的映射器将键读取为偏移量,将值读取为您的行,然后使用自定义可写类为映射器输出键和值进行转换。

【讨论】:

  • 是的,Vignesh,我同意您的回答,我希望您知道,我已尝试尽可能自定义 MR 代码中的所有内容。
  • 代码在映射器之前运行良好,但每当我使用自定义键作为减速器的输出键类时,我一直面临减速器的问题。
  • 我只是得到一个 NullPointerException
  • 如果你得到了正确的mapper输出,那么nullpointer异常似乎是代码有问题。发布您的减速器代码和映射器输出以及预期的减速器输出。
  • 到目前为止,我并不担心输出的格式,我只想让我的程序运行。我马上发布代码
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-08-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多