【问题标题】:How to do a parallel unique word count with Java 8 streams and lambdas?如何使用 Java 8 流和 lambda 进行并行唯一字数统计?
【发布时间】:2016-04-23 14:22:49
【问题描述】:

使用 Java 8 流和 lambda 进行并行唯一字数统计的最佳方法是什么?

我想出了几个,但我不相信它们是最佳的。 我知道 Hadoop 上的 map reduce 解决方案,想知道它们是否提供相同的并行性。

// Map Reduce Word Count 

Map<String, Integer> wordCount = Stream.of("dog","cat","dog","dog","cow","house","house").parallel().collect( Collectors.groupingBy(e->e,Collectors.summingInt(e -> 1)));
System.out.println("number of dogs = " + wordCount.get("dog"));

Map<Object, Object> wordCount2 = Stream.of("dog","cat","dog","dog","cow","house","house").parallel().collect(Collectors.toConcurrentMap(keyWord->keyWord, keyWord->1, (oldVal,newVal)->(int)oldVal+(int)newVal));
System.out.println("number of dogs = " + wordCount2.get("dog"));

假设实际列表会更长,可能来自文件或生成的流,并且我想知道所有单词的计数,而不仅仅是 dog。

【问题讨论】:

  • 第一个看起来不错。注意:短列表通常比单线程更好,并且在您的示例中使用过滤器/计数会更快。
  • 编辑了问题以澄清在长列表中查找所有字数。

标签: java lambda mapreduce word-count java-stream


【解决方案1】:

看看Collectors.groupingBy的javadocs

@implNote 返回的 Collector 不是并发的。对于并行流 管道,组合器功能通过合并一个键来操作 映射到另一个,这可能是一项昂贵的操作。如果保存 元素呈现给下游的顺序 不需要收集器,使用 groupingByConcurrent(Function, 供应商、收集器)可能会提供更好的并行性能。

现在,看看Collectors.groupingByConcurrent,您会发现这或多或少等同于您的第二种方法

返回实现级联“分组依据”的并发收集器 对类型 T 的输入元素进行操作,根据 分类函数,然后执行归约操作 使用指定的与给定键关联的值 下游收集器。 Collector 产生的 ConcurrentMap 是 使用提供的工厂函数创建。

【讨论】:

  • 好的,谢谢。我很惊讶在 Hadoop 的示例中没有关于如何解决这个著名问题的标准示例
  • 仅仅阅读感兴趣的方法之外的javadocs通常是有帮助的,通常有可能有用的相关方法。
【解决方案2】:

groupingByConcurrenttoConcurrentMap 相比,groupingBytoMap 在大型数据集上的运行速度可能较慢。检查groupingByConcurrenttoConcurrentMap 是否更快的最佳方法是在您自己的数据集上自行对它们进行基准测试。我认为结果几乎相同。

但是请注意,如果您使用该文件作为源,您可能会因为在 Java 8 中使用并行性而获得较少的加速,Files.lines()BufferedReader.lines() 正在按顺序读取文件,并行性是通过将行块预缓冲到数组中来实现的并产生新的任务。这并不总是有效的,所以瓶颈可能在这个过程中。在 JDK 9 中,Files.lines() 进行了优化(针对小于 2Gb 长的常规文件),因此您可以在那里获得更好的性能。

至于生成的源,这取决于您如何生成它们。如果您为源提供良好的拆分策略会更好。如果您使用Stream.iterateSpliterators.spliterator(iterator, ...) 或扩展AbstractSpliterator 类,则默认拆分策略将相同:将一些元素预缓冲到数组中以生成子任务。

【讨论】:

    【解决方案3】:

    解释李的代码:

    public static Map<String, Integer> wordCount(Stream<String> stream) {
        return stream
           .flatMap(s -> Stream.of(s.split("\\s+")))
           .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); 
    }
    

    s -> s: 键映射器

    s -> 1: 值映射器

    Integer::sum: 合并函数

    【讨论】:

      【解决方案4】:
      public static Map<String, Integer> wordCount(Stream<String> stream) {
          return stream
             .flatMap(s -> Stream.of(s.split("\\s+")))
             .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum)); 
      }
      

      【讨论】:

        猜你喜欢
        • 2017-08-03
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-02-28
        • 2018-06-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多