【问题标题】:Overriding RecordReader to read Paragraph at once instead of line覆盖 RecordReader 以立即读取段落而不是行
【发布时间】:2013-03-25 17:35:06
【问题描述】:

我正在重写 RecordReader 类的方法“next”和 TextInputFormat 类的“getRecordReader”,以便将整个段落发送到映射器,而不是逐行发送。 (我正在使用旧的 api,并且我的段落的定义是追加的,直到我的文本文件中出现一个空行。)
以下是我的代码:

public class NLinesInputFormat extends TextInputFormat  
{  
   @Override
   public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException     {   
        reporter.setStatus(split.toString());  
        return new ParagraphRecordReader(conf, (FileSplit)split);
    }
}



public class ParagraphRecordReader implements RecordReader<LongWritable, Text> 
{
        private LineRecordReader lineRecord;
        private LongWritable lineKey;
        private Text lineValue;
        public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
            lineRecord = new LineRecordReader(conf, split);
            lineKey = lineRecord.createKey();
            lineValue = lineRecord.createValue();
        }

        @Override
        public void close() throws IOException {
            lineRecord.close();
        }

        @Override
        public LongWritable createKey() {
            return new LongWritable();

        }

        @Override
        public Text createValue() {
            return new Text("");

        }

        @Override
        public float getProgress() throws IOException {
            return lineRecord.getPos();

        }

        @Override
        public synchronized boolean next(LongWritable key, Text value) throws IOException {
            boolean appended, gotsomething;
            boolean retval;
            byte space[] = {' '};
            value.clear();
            gotsomething = false;
            do {
                appended = false;
                retval = lineRecord.next(lineKey, lineValue);
                if (retval) {
                    if (lineValue.toString().length() > 0) {
                        byte[] rawline = lineValue.getBytes();
                        int rawlinelen = lineValue.getLength();
                        value.append(rawline, 0, rawlinelen);
                        value.append(space, 0, 1);
                        appended = true;
                    }
                    gotsomething = true;
                }
            } while (appended);

            //System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]");
            return gotsomething;
        }

        @Override
        public long getPos() throws IOException {
            return lineRecord.getPos();
        }
    }  

问题:
1. 我没有找到任何关于如何做到这一点的具体指南,所以可能是我做错了什么,请评论任何建议?
2. 我能够正确编译它,但是当我运行我的工作时,我的映射器一直在运行,我无法弄清楚问题出在哪里?

【问题讨论】:

  • 你尝试过只输入一个段落吗?
  • 我认为你有一个错误;当你交叉拆分时,你会得到额外的段落。我认为您需要区分从 0 开始的拆分和每隔一个拆分。以 0 开头的第一行开始一个段落,但以行开头的拆分不应开始一个新段落。 (通常你会读过一个分割边界,所以如果你的分割有连续一个段落的行,它们就会被前一个分割发出)。我错过了什么吗?

标签: hadoop


【解决方案1】:

您的代码非常适合我。 我所做的唯一更改是将这些类作为内部类并将它们设为静态。

输入文件如下:

This is awesome.
WTF is this.

This is just a test.

映射器代码如下:

@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
    throws IOException {

    System.out.println(key+" : "+value);
}

输出是:

0 : This is awesome. WTF is this. 
0 : This is just a test.

相信大家一定没有忘记设置输入格式,但以防万一,设置如下:

conf.setInputFormat(NLinesInputFormat.class);

【讨论】:

  • 感谢您回复 Amar!.. 我将这些类用作公共静态并设置了 Inputformat,但我没有尝试使用小段落,而是使用大文件对其进行测试。我会这样做,然后告诉你进展如何。
  • 嘿,谢谢伙计...我检查了短输入文件,它对于长文件工作正常这是一些格式问题我已经弄清楚了!
  • @Amar 我是 hadoop 的初学者,你能解释一下下一个方法里面发生了什么吗?你能解释一下实现的逻辑吗?我需要一点帮助。
  • next() 基本上是决定下一条记录是映射器的问题,默认实现会发出一行,而在这种情况下,我们需要将完整的段落传递给映射器作为单个记录,因此我们覆盖next()。现在,一个段落应该被定义为所有行的集合,直到存在连续的 2 个换行符 ('/n'),现在这是使用 LineRecordReader 实现的,我们继续累积所有行,直到我们得到一个空行。
猜你喜欢
  • 2018-03-31
  • 2020-10-17
  • 1970-01-01
  • 1970-01-01
  • 2020-08-30
  • 2023-02-24
  • 1970-01-01
相关资源
最近更新 更多