【问题标题】:Java lambdas, stateless lambdas and parallel executionJava lambda、无状态 lambda 和并行执行
【发布时间】:2014-05-01 15:04:19
【问题描述】:

在尝试学习 Java lambdas 时,我遇到了一篇文章(如下所列),其中在有关流 API 限制的部分下,他指出:“有状态的 lambdas 通常在顺序执行时不是问题,但是当流执行是并行的,它会中断”。然后,他给出了这段代码作为执行顺序问题的示例:

List<String> ss = ...;
List<String> result = ...;

Stream<String> stream = ss.stream();

stream.map(s -> {
    synchronized (result) {
      if (result.size() < 10) {
        result.add(s);
      }
    }
})
.forEach(e -> { });

我可以看到如果它被并行化,这将是如何不确定的,但我看不到你将如何使用无状态 lambda 来解决这个问题 - 是否存在将东西添加到一个固有的不确定性?以并行方式列出。一个戴帽子的六岁孩子可以理解的例子,也许是 C#,将不胜感激。

原文链接http://blog.hartveld.com/2013/03/jdk-8-33-stream-api.html

【问题讨论】:

  • 有什么问题?您想了解如何使用 Java 8 并行流为第一个然后项执行一些代码吗?或者你想了解,fork-join 算法是如何工作的,如何规避共享数据的问题?
  • 后者在无状态 lambda 的上下文中。

标签: java lambda parallel-processing java-8


【解决方案1】:

我知道你在暗示你的问题,我会尽力解释。

考虑一个由 8 个元素组成的输入列表:

[1, 2, 3, 4, 5, 6, 7, 8]

假设流会通过以下方式并行化它,实际上它们不会,并行化的确切过程很难理解。
但是现在,假设他们会将大小除以 2,直到剩下两个元素。

分支部分如下所示:

  1. 第一师:

    [1, 2, 3, 4]
    [5, 6, 7, 8]

  2. 二级:

    [1, 2]
    [3, 4]
    [5, 6]
    [7, 8]

现在我们有四个块(在我们的理论中)将由四个不同的线程处理,它们彼此不知道。
这确实可以通过在一些外部资源上进行同步来解决,但是你会失去并行化的好处,所以我们需要假设我们不同步,当我们不同步时,其他线程将看不到其他线程有什么完成,所以我们的结果将是垃圾。

现在到您询问无国籍的问题的一部分,然后如何正确地并行处理它?如何将按正确顺序并行处理的元素添加到列表中?

首先假设一个简单的映射函数,您使用 lambda i -&gt; i + 10 进行映射,然后在 foreach 中使用 System.out::println 打印它。

现在在第二次划分之后将发生以下情况:

[1, 2] -&gt; [11, 12] -&gt; { System.out.println(11); System.println(12); }
[3, 4] -&gt; [13, 14] -&gt; { System.out.println(13); System.println(14); }
[5, 6] -&gt; [15, 16] -&gt; { System.out.println(15); System.println(16); }
[7, 8] -&gt; [17, 18] -&gt; { System.out.println(17); System.println(18); }

除了由同一个线程处理的所有元素(内部状态,不依赖)按顺序处理之外,不能保证顺序。

如果你想按顺序处理它们,那么你需要使用forEachOrdered,这将确保所有线程都以正确的顺序运行,并且你不会因此而失去太多并行化的好处,因为它适用只到结束状态。

要了解如何将并行化的项目添加到列表中,请使用Collectors.toList() 查看此内容,它提供了以下方法:

  • 创建新列表。
  • 向列表中添加值。
  • 合并两个列表。

现在第二次分割后会发生以下情况:

对于每四个线程,它将执行以下操作(此处仅显示一个线程):

  1. 我们有[1, 2]
  2. 我们将其映射到[11, 12]
  3. 我们创建一个空的List&lt;Integer&gt;
  4. 我们将11 添加到列表中。
  5. 我们将12 添加到列表中。

现在所有线程都完成了这项工作,我们有四个包含两个元素的列表。

现在以下合并按指定顺序发生:

  1. [11, 12] ++ [13, 14] = [11, 12, 13, 14]
  2. [15, 16] ++ [17, 18] = [15, 16, 17, 18]
  3. 终于[11, 12, 13, 14] ++ [15, 16, 17, 18] = [11, 12, 13, 14, 15, 16, 17, 18]

因此结果列表是有序的,并且映射是并行完成的。现在您也应该能够看到为什么并行化需要更高的最小值,因为只有两个项目,否则创建新列表和合并会变得过于昂贵。

我希望你现在明白为什么流操作应该是无状态来获得并行化的全部好处。

【讨论】:

  • 谢谢。还在消化这个。我认为我的问题更多在于理解无状态 lambda 现在是什么:-(
  • @JohnBarça 最好记住的是,无状态 lambda 不会修改其他变量的状态。
  • 这类似于 map-reduce 吗?
【解决方案2】:

减少这个问题,看起来它只是在流中找到前 10 个元素。并分别对整个流进行 foreach 。 s.limit(10).collect(...);s.forEach(...);。地图调用实际上也没有返回任何东西,所以我怀疑它是否可以编译。

【讨论】:

    【解决方案3】:

    这是来自@skiwi 的一个很好的例子,让我看看我是否可以添加一点。

    并行计算中的术语“有序”通常意味着以与顺序过程相同的顺序返回结果。也就是调用sequential.method()或者parallel.method(),结果看起来是一样的。

    forEachOrdered() 的问题在于,框架无法为每个任务的结果创建唯一的对象,并在完成时对它们进行排序而不会停止。因此,它将流视为平衡树。该框架创建一个带有父/子关联的 ConcurrentHashMap。它首先执行左子节点,然后执行右子节点,然后父节点强制执行发生前的关系,其中处理应该是并发的。从有序的结果到有序的顺序处理。

    您需要做的是对结果进行排序,而不是按照遇到的顺序进行处理。为每个最终除法(这里我们使用@skiwi 的第二次除法)创建包含数组部分的对象,由计算填充的处理结果和每个对象的序列号。让线程同时处理对象。当所有线程都完成后,按序列号对对象进行排序并完成您的工作。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-16
      • 1970-01-01
      • 2022-01-24
      • 2016-06-07
      • 1970-01-01
      • 2021-02-10
      • 1970-01-01
      相关资源
      最近更新 更多