【问题标题】:Order guarantees using streams and reducing chain of consumers使用流和减少消费者链的订单保证
【发布时间】:2020-05-10 01:03:29
【问题描述】:

因此,在当前场景中,我们有一组 API,如下所列:

Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();

在这些之上,我们的一个调度程序执行任务,例如

private void performAllTasks(T data) {
    start().andThen(performDailyAggregates())
            .andThen(performLastNDaysAggregates())
            .andThen(repopulateScores())
            .andThen(updateDataStore())
            .accept(data);
}

在查看此内容时,我想到了一个更灵活的实现1来执行如下所示的任务:

// NOOP in the context further stands for  'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}

现在让我印象深刻的一点是,Javadoc 明确指出

accumulator - 一个关联的、非干扰的、无状态的函数 结合两个值

接下来我在想How to ensure order of processing in java8 streams?订购(处理顺序与遭遇顺序相同)!

好的,从List 生成的流将是有序,除非流在reduce 之前生成parallel,否则以下实现将起作用。 2

private void performAllTasks(List<Consumer<T>> consumerList, T data) {
    consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}

问。这个假设 2 成立吗?是否可以保证始终按照原始代码的顺序执行消费者?

问。是否有可能以某种方式将 1 暴露给被调用者以执行任务?

【问题讨论】:

  • 假设“2”不应包含“除非流是并行的”这样的短语,因为解决方案只能是正确的或不正确的。取决于顺序执行是不正确的(我不是说这是不正确的,答案更复杂)。但是,“也暴露 1”是什么意思?
  • @Holger 我的意思是,我可以在我的服务中创建实现 1 并让使用它的其他服务传递Stream&lt;Consumer&lt;T&gt;&gt; consumerStream,这会带来复杂性UNORDERED 流也是如此。但是,我仍然可以更改流以遵循遭遇顺序吗? (我希望我能解释得更好)
  • 我想你在这里要问的是:我的方法可以接受Stream&lt;Consumer&lt;T&gt;&gt;吗?即使有人通过parallel流或unordered()明确调用它,我仍然会能够按照收到消费者的确切顺序执行消费者。
  • @Eugene 事实上。感谢您改进措辞。
  • 流无序时,没有遇到顺序,也没有办法构造一个。假设您在顺序评估中表现出的任何顺序都具有特殊含义,这从根本上是有缺陷的。然而,对于有序流,Andreas 已经指出它会起作用。但是,这种方法还有其他实际问题。我会收集一个答案。

标签: java java-8 java-stream reduce consumer


【解决方案1】:

由于Andreas pointed outConsumer::andThen 是一个关联函数,虽然生成的消费者可能具有不同的内部结构,但它仍然是等价的。

但是让我们调试一下

public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('\n');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, '\u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, '\u251c');
            sb.setCharAt(myHandle+1, '\u2500');
        }
        return sb;
    }
}

将打印

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a

而将缩减代码更改为

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}

在我的机器上打印

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│   ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│   └─combined
│     ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│     └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
  ├─combined
  │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
  │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
  └─combined
    ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
    └─combined
      ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
      └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d

说明了安德烈亚斯回答的重点,但也强调了一个完全不同的问题。您可以通过使用来最大化它,例如示例代码中的IntStream.range(0, 100)

并行评估的结果实际上比顺序评估要好,因为顺序评估会创建不平衡的树。当接受任意的消费者流时,这可能是一个实际的性能问题,甚至在尝试评估结果消费者时会导致StackOverflowError

对于任何数量不小的消费者,您实际上需要一个平衡的消费者树,但为此使用并行流不是正确的解决方案,因为 a)Consumer::andThen 是一种廉价的操作,并行评估没有真正的好处,并且 b ) 平衡将取决于不相关的属性,例如流源的性质和 CPU 内核的数量,它们决定了减少何时回落到顺序算法。

当然,最简单的解决方案是

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
    consumers.forEachOrdered(c -> c.accept(data));
}

但是当你想构造一个复合 Consumer 以便重复使用时,你可以使用

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
    List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
    if(consumerList.isEmpty()) return t -> {};
    if(consumerList.size() == 1) return consumerList.get(0);
    if(consumerList.size() < ITERATION_THRESHOLD)
        return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
    return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
    if(end-start>2) {
        int mid=(start+end)>>>1;
        return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
    }
    T t = l.get(start++);
    if(start<end) t = f.apply(t, l.get(start));
    assert start==end || start+1==end;
    return t;
}

当消费者数量超过阈值时,代码将仅使用循环提供单个Consumer。对于大量消费者而言,这是最简单、最有效的解决方案,事实上,您可以针对较小的消费者放弃所有其他方法,但仍能获得合理的性能……

请注意,这仍然不会妨碍消费者流的并行处理,如果他们的构造真的从中受益的话。

【讨论】:

  • 我不明白你怎么能这么快发布如此全面的答案,人们怎么能这么快地投票(我假设理解它?)。
  • @Eugene,Naman 好吧,当我发表最后一条评论时,我已经在写答案了。当然,这有助于以前遇到过这样的问题。比如说AWTEventMulticaster,即使在 Java 1.1 中,也有太多的听众允许遇到这样的问题。然后,当 Java 8 是新版本时,我偶然发现了它,并且我使用归约来构建评估树,所以我已经有了平衡归约代码的变体,只需要针对这个答案进行调整和测试。
  • @Naman 我的假设是“他”根本不是人类,而是某种混合人工智能。答案很有趣(除了笑话),从那个平衡拆分的DebuggableConsumer 中的递归toString 开始。我承认 balanceSplit 看起来对 ArrayList Spliterator 非常熟悉......但是是的,这非常好。
  • @Holger 在combineAllTasks 中未使用的data 上创建了一个minor edit
  • @Eugene 我一直有同样的想法;)。我需要一段时间才能完全理解答案的最后一部分。一如既往的出色回答。
【解决方案2】:

即使 Stream&lt;Consumer&lt;T&gt;&gt; 是并行的,生成的复合 Consumer 也会按顺序执行各个消费者,假设:

  • Streamordered
    来自List 的流是有序的,即使启用了并行也是如此。

  • 传递给reduce()accumulatorassociative
    Consumer::andThen 是关联的。

假设您有一个包含 4 个消费者 [A, B, C, D] 的列表。通常,如果没有并行,会发生以下情况:

x = A.andThen(B);
x = x.andThen(C);
compound = x.andThen(D);

这样调用compound.apply() 会依次调用ABC,然后是D

如果启用并行,流框架可能会将其拆分为由 2 个线程处理,[A, B] 由线程 1 处理,[C, D] 由线程 2 处理。

这意味着将发生以下情况:

x = A.andThen(B);
y = C.andThen(D);
compound = x.andThen(y);

结果是先应用x,即先应用A,再应用B,再应用y,即应用C,再应用D

因此,尽管复合消费者是像 [[A, B], [C, D]] 而不是左关联 [[[A, B], C], D] 那样构建的,但 4 个消费者按顺序执行的,这一切都是因为 Consumer::andThen关联。

【讨论】:

  • 非常感谢您的回答。我可能知道顺序和关联部分,但是,通过线程处理示例进行验证,便于进一步理解。只是一点,无论如何你一定已经阅读了 Holger 的答案,我不能同时标记两个答案。再次非常感谢。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-08-29
  • 1970-01-01
  • 1970-01-01
  • 2018-04-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多