【问题标题】:Mapreduce chaining values into a list for each keyMapreduce 将值链接到每个键的列表中
【发布时间】:2016-12-07 21:15:26
【问题描述】:

我有一个在 mapreduce 中做的小项目,因为我是新手,所以我遇到了很多困难,所以希望能得到帮助。 在这个项目中,我有一个包含站点和标签的文件(每个站点有 10 个标签),我想通过共享标签为每个站点查找类似站点。 所以例如 3 个站点,这是我的数据集

site1   tag1
site1   tag2
site1   tag3
site1   tag4
site1   tag5
site2   tag1
site2   tag2
site2   tag3
site2   tag11
site2   tag12
site3   tag1
site3   tag11
site3   tag13
site3   tag14
site3   tag15

(对于这个例子,我只为每个站点制作了 5 个)。 我想做的是做一个mapreduce,关键是标签和价值网站。 我希望每个标签都获得具有此标签的网站列表(或数组或其他) 所以在这个例子中:

tag1: site1, site2, site3
tag2: site1,site2
tag3: site1, site2
tag4: site1 

等等 然后遍历列表并为每个常见的对在它旁边给出一个 1 的值,所以看起来像这样

tag1: site1_site2 1, site1_site3 1, site2_site3 1
tag2: site1_site2 1

等等 然后链接另一个 mapreduce 作业以对每对的值求和 我为它写了这段代码

public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{

    private Text site = new Text();
    private Text tag = new Text();
    public void map(Object key, Text value, Context context) 
                       throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
        while (itr.hasMoreTokens()) {
            site.set(itr.nextToken());
            tag.set(itr.nextToken());
            context.write(tag, site);
        }
    }
}

public static class tagCount extends Reducer<Text,IntWritable,Text,Text> {

    public void reduce(Text key, Iterable<Text> values, Context context) 
                             throws IOException, InterruptedException {
        String res = "";
        while (values.iterator().hasNext()) {
            res = res + "," + values.iterator().next();
        }
        Text result = new Text(res);
        context.write(key, result);
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "tag count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(tagCount.class);
    job.setReducerClass(tagCount.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

我的第一个问题是如何将 reducer 中的值链接在一起? 因为现在我只是得到一个列表

tag1 site1
tag1 site2 

等等 我试图设置一个字符串,当我遍历值以将下一个标记添加到字符串时,它不起作用

非常感谢您的帮助

【问题讨论】:

  • 为了让我的问题更具体,我想知道如何迭代 reducer 内某个键的所有值。
  • 好吧我实际上发现我的代码甚至没有去reducer函数只有map函数:(
  • 删除job.setCombinerClass(tagCount.class);。你不想要一个组合器。
  • 我删除了它,但我认为没有改变,由于某种原因它仍然没有进入减速器功能。 (试图从中打印东西没有打印出来)并且调试不会进入它
  • 找到了它没有进入reducer的原因我使用了错误的exten忘记将IntWritable转换为Text :)

标签: hadoop mapreduce


【解决方案1】:

这是你的reducer的重写,让你开始:

public static class TagCount extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable one = new IntWritable(1);
    private Text out = new Text();

    public void reduce(Text key, Iterable<Text> values, Context context) 
                         throws IOException, InterruptedException {

        List<String> sites = new ArrayList<String>();
        for (Text t : values) {
            sites.add(t.toString());
        }

        for (int i=0; i<sites.size()-1; i++) {
            for (int j=i+1; j<sites.size(); j++) {
                out.set(sites.get(i) + "_" + sites.get(j))
                context.write(out, one);
            }
        }
    }
}

总结:

  • 您需要构建值的内部集合。在这种情况下,我使用了一个字符串列表,这是最安全的方法,直到您对 Hadoop 如何重用对象感到满意为止。
  • 此代码假定sites 不会很大,因此改进之处在于添加一些对其大小的检查,因为我们将它放在内存中,并且以下 context.write 将扩展数据。
  • 然后您遍历站点并生成排列,将每一个都写出来。
  • 使用SequenceFileOutputFormat写出数据,然后你的后续工作可以使用SequenceFileInputFormat,进入映射器的类型将是TextIntWritable

【讨论】:

  • 您好,感谢您的帮助,它确实推动了我前进我认为 For 无法正常工作或覆盖自身
  • 确保你有job.setOutputValueClass(IntWritable.class);
  • 首先我真的想说谢谢你抽出时间来帮助我非常感激。
  • 很想发表评论,所以把它放在答案@Binary
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-22
  • 2012-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多