【问题标题】:MultipleOutputs in hadoophadoop中的多个输出
【发布时间】:2014-03-06 08:07:50
【问题描述】:

我在reduce 阶段的reduce 程序中使用MultipleOutputs。我正在处理的数据集约为 270 mb,我在我的伪分布式单节点上运行它。我为我的地图输出值使用了自定义可写。键是数据集中存在的国家。

public class reduce_class extends Reducer<Text, name, NullWritable, Text> {
    public void reduce(Text key,Iterable<name> values,Context context) throws IOException, InterruptedException{
        MultipleOutputs<NullWritable,Text> m = new MultipleOutputs<NullWritable,Text>(context);
        long pat;
        String n;
        NullWritable out = NullWritable.get();
        TreeMap<Long,ArrayList<String>> map = new TreeMap<Long,ArrayList<String>>();
        for(name nn : values){
            pat = nn.patent_No.get();
            if(map.containsKey(pat))
                map.get(pat).add(nn.getName().toString());
            else{
                map.put(pat,(new ArrayList<String>()));
                map.get(pat).add(nn.getName().toString());}
    }
        for(Map.Entry entry : map.entrySet()){
            n = entry.getKey().toString();
            m.write(out, new Text("--------------------------"), key.toString());
            m.write(out, new Text(n), key.toString());
            ArrayList<String> names = (ArrayList)entry.getValue();
            Iterator i = names.iterator();
            while(i.hasNext()){
                n = (String)i.next();
                m.write(out, new Text(n), key.toString());
        }
            m.write(out, new Text("--------------------------"), key.toString());           
    }
        m.close();
}

}

以上是我的reduce逻辑

问题

1) 上述代码适用于小型数据集,但由于堆空间为 270 mb 数据集而失败。

2) 使用国家作为键在单个可迭代集合中传递相当大的值。我试图解决这个问题,但 MutlipleOutputs 为给定的一组键创建唯一文件。重点是我无法附加之前运行reduce创建的现有文件并引发错误。因此对于特定的键,我必须创建新文件。有没有办法解决这个问题? .解决上述错误导致我将键定义为国家/地区名称(我的最终排序数据)但抛出 java 堆错误。

示例输入

3858241,"Durand","Philip","E.","","","Hudson","MA","US","",1 3858241,"诺里斯","朗尼","H.","","","米尔福德","MA","US","",2 3858242,"Gooding","Elwyn","R.","","120 Darwin Rd.","Pinckney","MI","US","48169",1 3858243,"皮埃隆","克劳德","雷蒙德","","","Epinal","","FR","",1 3858243,"Jenny","Jean","Paul","","","Decines","","FR","",2 3858243,"Zuccaro","罗伯特","","","","Epinal","","FR","",3 3858244,"Mann","Richard","L.","","P.O. Box 69","Woodstock","CT","US","06281",1

小型数据集的示例输出

示例目录结构...

CA-r-00000

FR-r-00000

魁北克-r-00000

TX-r-00000

US-r-00000

*个别内容*


3858241 菲利普·E·杜兰德

朗尼·H·诺里斯


3858242

艾尔文·古丁


3858244

理查德·曼恩


【问题讨论】:

  • 堆空间可能是由于 Treemap 中存储了大量数据
  • 是的,可以。导致特定国家/地区的数据很容易超过 100 mb。如果使用多个输出可以解决问题,我可以将数据写入之前已经创建的预先存在的文件。可以吗?
  • 基本上你可以通过再次创建相同的文件来更新文件(删除然后创建)。无法更新到同一文件。
  • 请详细说明。如果我删除文件,已经写入的数据会丢失吗?从给定的文件中,我想做的基本上是在国家/地区明智地提取数据,并在特定国家/地区的每个文件中,将属于同一索引(专利号)的所有名称分组
  • 我可以通过增加 mapred-site.xml 中的堆空间来解决。我要问的是我们是否能够使用 MultipleOutputs 更新之前已经创建的文件中的数据?

标签: java hadoop mapreduce multipleoutputs


【解决方案1】:

我知道我在这里回答了一个非常古老的问题,但无论如何让我在这里提出一些想法。您似乎正在您的 reducer 中创建一个 TreeMap,其中包含您在一次 reduce 调用中获得的所有记录。在 Mapreduce 中,您无法将所有记录保存在内存中,因为它永远不会扩展。您正在制作一张patent_no 以及与该patent_no 关联的所有names 的地图。您想要的只是根据patent_no 分离出记录,那么为什么不利用mapreduce 框架的排序。

您应该在可写密钥本身中包含patent_noname 以及country

  • 仅基于countryPartitioner 写入分区。
  • 排序应在countrypatent_noname
  • 你应该写你的Grouping comparatorcountrypatent_no上分组。

因此,所有具有相同country 的记录将进入同一个reducer 并按patent_noname 排序。并且在同一个 reducer 中,不同的专利号会去不同的 reduce 调用。现在您只需要简单地将其写入 MultipleOutputs。因此,您摆脱了内存中的任何 TreeMap。

我建议你应该注意的几点是:

  • 不要每次都在reduce方法中创建new MultipleOutputs,而是应该写一个setup()方法,在setup()方法中只创建一个。
  • 不要每次都创建new Text(),而是在setup方法中创建一个,并通过set("string")方法Text重用同一个实例。你可以说那有什么意义,Java 的 GC 无论如何都会垃圾收集它。但您应始终尝试使用尽可能低的内存,以减少调用 Java 垃圾收集的频率。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-01
    相关资源
    最近更新 更多