【问题标题】:Hadoop Map Reduce , How to combine first reducer output and first map input , as input for second mapper?Hadoop Map Reduce,如何组合第一个 reducer 输出和第一个 map 输入,作为第二个 mapper 的输入?
【发布时间】:2012-10-23 11:50:32
【问题描述】:

我需要使用 map reduce 来实现一个功能。

要求如下。

  1. 映射器的输入是一个包含两列 productId 和 Salescount 的文件
  2. 减速机输出,销售额总和

要求是我需要计算salescount / sum(salescount)。

为此,我计划使用嵌套映射减少。 但是对于第二个映射器,我需要使用第一个 reducers 输出和第一个 map 的输入。

我该如何实现。或者有什么替代方法?

问候 维努

【问题讨论】:

    标签: hadoop mapreduce


    【解决方案1】:

    您可以按照自己的方式使用ChainMapperChainReducer 来PIPE Mappers 和Reducers。请看here

    以下将类似于您需要实现的代码 sn-p

    JobConf mapBConf = new JobConf(false);
    
    JobConf reduceConf = new JobConf(false);
    
    ChainMapper.addMapper(conf, FirstMapper.class, FirstMapperInputKey.class, FirstMapperInputValue.class,
       FirstMapperOutputKey.class, FirstMapperOutputValue.class, false, mapBConf);
    
    ChainReducer.setReducer(conf, FirstReducer.class, FirstMapperOutputKey.class, FirstMapperOutputValue.class,
       FirstReducerOutputKey.class, FirstReducerOutputValue.class, true, reduceConf);
    
    ChainReducer.addMapper(conf, SecondMapper.class, FirstReducerOutputKey.class, FirstReducerOutputValue.class,
       SecondMapperOutputKey.class, SecondMapperOutputValue.class, false, null);
    
    ChainReducer.setReducer(conf, SecondReducer.class, SecondMapperOutputKey.class, SecondMapperOutputValue.class, SecondReducerOutputKey.class, SecondReducerOutputValue.class, true, reduceConf);
    

    或者如果您不想使用多个 Mapper 和 Reducer,您可以执行以下操作

    public static class ProductIndexerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
    
        private static Text productId = new Text();
        private static LongWritable salesCount = new LongWritable();
    
        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, LongWritable> output, Reporter reporter)
                throws IOException {
            String[] values = value.toString().split("\t");
            productId.set(values[0]);           
            salesCount.set(Long.parseLong(values[1]));
            output.collect(productId, salesCount);
        }
    
    }
    
    public static class ProductIndexerReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
    
        private static LongWritable productWritable = new LongWritable();
    
        @Override
        public void reduce(Text key, Iterator<LongWritable> values,
                OutputCollector<Text, LongWritable> output, Reporter reporter)
                throws IOException {
            List<LongWritable> items = new ArrayList<LongWritable>(); 
            long total = 0;
            LongWritable item = null;
            while(values.hasNext()) {
                item = values.next();
                total += item.get();
                items.add(item);
            }
            Iterator<LongWritable> newValues = items.iterator();
            while(newValues.hasNext()) {
                productWritable.set(newValues.next().get()/total);
                output.collect(key, productWritable);
            }
        }
    
    }
    

    `

    【讨论】:

    • 正如@shazin 提到的那样,这种情况不需要工作链。检查Data-Intensive Text Processing with MapReduce 了解许多此类算法。这将有助于以 MapReduce 方式思考。
    • Vinu,你能写一个例子吗?假设您的输入文件为 1 ---> 23
      1 ---> 33
      1 ---> 22
      1 ---> 2
      2 ---> 3 2 ---> 4
      2 ---> 5
      你期望的输出是什么? 1 ---> (23+33+22+2)/(23+33+22+2+3+4+5)
      2 ---> (3+4+5)/(23+33 +22+2+3+4+5)
      这是你的问题吗?
    • 你在同一个ChainReducer 上做了两次setReducer。有用吗?
    【解决方案2】:

    有了这个用例,我相信我们不需要两个不同的映射器/mapreduce 作业来实现这一点。 (作为上述cmets给出的答案的扩展)

    假设您有一个非常大的输入文件,在 HDFS 中拆分为多个块。当您使用此文件作为输入触发 MapReduce 作业时,多个映射器(等于输入块的数量)将开始并行执行。

    在您的映射器实现中,从输入中读取每一行并将 productId 作为 key 并将 saleCount 作为 value 写入上下文。该数据被传递给 Reducer。

    我们知道,在 MR 作业中,所有具有相同 key 的数据都会传递给同一个 reducer。现在,在您的 reducer 实现中,您可以计算特定 productId 的所有 saleCounts 的总和。

    注意:我不确定您的分子中的值“salescount”。

    假设它是特定产品的出现次数,请使用计数器在计算 SUM(saleCount) 的同一 for 循环中添加并获取总销售额。所以,我们有

    totalCount -> 产品出现次数的计数 sumSaleCount -> 每个产品的 saleCount 值的总和。

    现在,您可以直接将上面的值相除:totalCount/sumSaleCount。

    希望这会有所帮助!如果您有不同的用例,请告诉我。

    【讨论】:

      猜你喜欢
      • 2021-08-19
      • 1970-01-01
      • 2016-03-13
      • 2013-08-13
      • 2021-08-17
      • 2022-06-12
      • 1970-01-01
      • 2021-05-04
      • 2011-05-31
      相关资源
      最近更新 更多