【问题标题】:Hadoop Map-Reduce . RecordReaderHadoop Map-Reduce 。记录阅读器
【发布时间】:2012-09-14 05:52:41
【问题描述】:

我正在尝试解决以下 RecordReader 问题。 示例输入文件:

1,1
2,2
3,3
4,4
5,5
6,6
7,7
.......
.......

我希望我的 RecordReader 返回

key | Value 
0   |1,1:2,2:3,3:4,4:5,5
4   |2,2:3,3:......6,6
6   |3,3:4,4......6,6,7,7

(对于第一个值的前五行,对于第二个值,从第二行开始五行,对于第三个值,从第三行开始五行,依此类推)

public class MyRecordReader extends RecordReader<LongWritable, Text> {

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

    while (pos < end) {
      key.set(pos);
      // five line logic 
      Text nextLine=new Text();



      int newSize = in.readLine(value, maxLineLength,
                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                         maxLineLength));
      fileSeek+=newSize;

      for(int n=0;n<4;n++)
      {
          fileSeek+=in.readLine(nextLine, maxLineLength,
                  Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                           maxLineLength));
          value.append(":".getBytes(), 0,1);
          value.append(nextLine.getBytes(), 0, nextLine.getLength());
      }
      if (newSize == 0) {

        return false;

      }
      pos += newSize;
      if (newSize < maxLineLength) {

        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
}

}

但这会将值返回为

key | Value 
0   |1,1:2,2:3,3:4,4:5,5
4   |6,6:7,7.......10,10
6   |11,11:12,12:......14,14

有人可以帮我处理这段代码,或者 RecodeReader 的新代码也可以吗? Requirement of the problem (may help you understand the use case) 谢谢

【问题讨论】:

  • 请正确格式化问题,并正确显示输出,然后我们可能能够回答......
  • @nyarlathotep:抱歉格式错误。我试着改进它,你仍然可以帮助我解答。

标签: java hadoop mapreduce


【解决方案1】:

我想我理解了这个问题......这就是我要做的:包装另一个 RecordReader 并将其中的键/值缓冲到本地队列中。

public class MyRecordReader extends RecordReader<LongWritable, Text> {
    private static final int BUFFER_SIZE = 5;
    private static final String DELIMITER = ":";

    private Queue<String> valueBuffer = new LinkedList<String>();
    private Queue<Long> keyBuffer = new LinkedList<Long>();
    private LongWritable key = new LongWritable();
    private Text value = new Text();

    private RecordReader<LongWritable, Text> rr;
    public MyRecordReader(RecordReader<LongWritable, Text> rr) {
        this.rr = rr;
    }

    @Override
    public void close() throws IOException {
        rr.close();
    }

    @Override
    public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return rr.getProgress();
    }

    @Override
    public void initialize(InputSplit arg0, TaskAttemptContext arg1)
            throws IOException, InterruptedException {
        rr.initialize(arg0, arg1);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (valueBuffer.isEmpty()) {
            while (valueBuffer.size() < BUFFER_SIZE) {
                if (rr.nextKeyValue()) {
                    keyBuffer.add(rr.getCurrentKey().get());
                    valueBuffer.add(rr.getCurrentValue().toString());
                } else {
                    return false;
                }
            }
        } else {
            if (rr.nextKeyValue()) {
                keyBuffer.add(rr.getCurrentKey().get());
                valueBuffer.add(rr.getCurrentValue().toString());
                keyBuffer.remove();
                valueBuffer.remove();
            } else {
                return false;
            }
        }
        key.set(keyBuffer.peek());
        value.set(getValue());
        return true;
    }

    private String getValue() {
        StringBuilder sb = new StringBuilder();
        Iterator<String> iter = valueBuffer.iterator();
        while (iter.hasNext()) {
            sb.append(iter.next());
            if (iter.hasNext()) sb.append(DELIMITER);
        }
        return sb.toString();
    }

}

然后,例如,您可以有一个自定义 InputFormat,它扩展 TextInputFormat 并覆盖 createRecordReader 方法来调用 super.createRecordReader 并返回包装在 MyRecordReader 中的结果,如下所示:

public class MyTextInputFormat extends TextInputFormat {
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) {
        return new MyRecordReader(super.createRecordReader(arg0, arg1));
    }
}

【讨论】:

  • 对不起,我在运行之前没有测试过代码。我编辑了它,现在试试。
猜你喜欢
  • 2014-04-14
  • 1970-01-01
  • 1970-01-01
  • 2012-07-07
  • 2011-07-21
  • 2014-03-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多