【问题标题】:PIG UDF handle multi-lined tuple split into different mapperPIG UDF 处理多行元组拆分成不同的映射器
【发布时间】:2012-12-02 00:13:11
【问题描述】:

我有一个文件,其中每个元组跨越多行,例如:

START
name: Jim
phone: 2128789283
address: 56 2nd street, New York, USA
END
START
name: Tom
phone: 6308789283
address: 56 5th street, Chicago, 13611, USA
END
.
.
.

所以上面是我文件中的 2 个元组。我编写了我的 UDF,它定义了一个 getNext() 函数,它检查它是否是 START 然后我将初始化我的元组;如果是 END 那么我将返回元组(来自字符串缓冲区);否则我只会将字符串添加到字符串缓冲区。

它适用于文件大小小于 64 MB 的 HDFS 块大小(在 Amazon EMR 上),而对于大于此大小的大小它将失败。我试着用谷歌搜索,找到这个blog post。 Raja 的解释很容易理解,他提供了一个示例代码。但是代码正在实现RecordReader 部分,而不是getNext() 用于猪LoadFunc。只是想知道是否有人有处理多行猪元组拆分问题的经验?我应该继续在 Pig 中实施RecordReader 吗?如果有,怎么做?

谢谢。

【问题讨论】:

  • 预处理文件以将记录的所有信息放在一行中会更简单 - 一行 awk
  • 如果我自己进行预处理,我无法控制输入文件,那么开销将超过 Hadoop 提供的好处。除了我在 Hadoop 中使用预处理。现在,我仍在寻找跨映射器边界处理元组的解决方案。

标签: hadoop amazon-web-services mapreduce user-defined-functions apache-pig


【解决方案1】:

您可以像Guy 提到的那样预处理您的输入,也可以应用here 所述的其他技巧。

我认为最简洁的解决方案是实现一个自定义的InputFormat(连同它的 RecordReader),它创建一个记录/START-END。 Pig 的LoadFunc 位于Hadoop InputFormat 的顶部,因此您可以定义LoadFunc 将使用的InputFormat。
自定义 LoadFunc 的原始骨架实现如下所示:

import java.io.IOException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

public class CustomLoader extends LoadFunc {

    private RecordReader reader;
    private TupleFactory tupleFactory;

    public CustomLoader() {
        tupleFactory = TupleFactory.getInstance();
    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        return new MyInputFormat(); //custom InputFormat
    }

    @Override
    public Tuple getNext() {
        Tuple result = null;
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }
            //value can be a custom Writable containing your name/value 
            //field pairs for a given record
            Object value = reader.getCurrentValue();
            result = tupleFactory.newTuple();
            // ...
            //append fields to tuple
        }
        catch (Exception e) {
            // ...
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit pigSplit) 
      throws IOException {
        this.reader = reader;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

LoadFunc 初始化InputFormat 及其RecordReader 后,它会定位您数据的输入位置并开始从recordReader 获取记录,创建结果元组(getNext()) 直到输入被完全读取。

关于自定义 InputFormat 的一些说明:

我将创建一个自定义 InputFormat,其中 RecordReader 是 org.apache.hadoop.mapreduce.lib.input.LineRecordReader: 大多数方法都会 保持不变,除了initialize():它将调用自定义 LineReader (基于org.apache.hadoop.util.LineReader)。 InputFormat 的键是行偏移(长),值是 custom 可写。这会将记录的字段(即 START-END 之间的数据)保存为键值对列表。每次调用 RecordReader 的 nextKeyValue() 时,LineReader 都会将记录写入自定义 Writable。整件事的要点是你如何 实施 LineReader.readLine()

另一种可能更简单的方法是更改​​ TextInputFormat 的分隔符(可在 Hadoop 0.23 中配置,请参阅 textinputformat.record.delimiter) 适合您的数据结构的一种(如果可能的话)。在这种情况下,您最终会将数据保存在 Text 中,您需要从中拆分和提取 KV 对并放入元组中。

【讨论】:

  • 只是想知道您是否可以提供自定义 InpurFormat 和 RecordReader 的框架实现?特别是如何处理跨越映射器边界的元组?谢谢。
  • MyInputFormat 似乎没有被使用。我将打印语句放在 MyInputFormat 一侧,它没有被打印出来。
【解决方案2】:

如果可以将 start 作为分隔符,则下面的代码可能在没有 UDF 的情况下也可以工作

SET textinputformat.record.delimiter 'START';
a  =  load  '<input path>' as  (data:chararray);
dump a;

输出如下:

    (
    name: Jim
    enter code here`phone: 2128789283
    address: 56 2nd street, New York, USA
    END
    )

    (
    name: Tom
    phone: 6308789283
    address: 56 5th street, Chicago, 13611, USA
    END
    )

现在两者都被分成两个元组。

【讨论】:

    猜你喜欢
    • 2016-11-15
    • 1970-01-01
    • 2018-08-27
    • 2014-11-20
    • 1970-01-01
    • 2016-10-05
    • 1970-01-01
    • 2018-02-27
    • 1970-01-01
    相关资源
    最近更新 更多