【问题标题】:Reactive Extensions: Is it possible to subscribe on Sum of the result of Buffer operation?反应式扩展:是否可以订阅缓冲区操作结果的总和?
【发布时间】:2016-02-09 00:08:08
【问题描述】:

我正在尝试获取有关聚合函数(即 Sum)结果的通知,该函数对无限序列的部分序列(最顶层,数据源序列永远不会完成)进行操作。 问题可以看这里:

var seq = Observable.Interval(TimeSpan.FromMilliseconds(20)).Buffer(10);
seq.Sum(l => l.Sum())
            .Subscribe(n =>
                s_log.DebugFormat("Got {0}", n));

Lambda l.Sum() 按预期调用(计算部分总和),但从未打印过“Got ...”行,因为从未调用过订阅者。我怀疑它与原始序列的“永无止境”字符有关。 有限序列:

 Observable.Range(1,100).Buffer(10);

按预期工作。 所以问题很简单:如何将无限序列的部分片段“标记”为“完整”,以便聚合函数单独处理它们(并将结果推送给订阅者)?

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    Scan是你的朋友:

    seq.Scan(0L, (l1, l2) => l1 + l2.Sum())
       .Subscribe(n => Console.WriteLine("Got {0}", n));
    

    【讨论】:

    • 你是对的,但为什么呢?我计划将 RX 扩展与我们的应用服务器集成,因此应用服务器的客户可以将 RX 扩展与我们的代码混合使用。我无法解释为什么 .Scan 会导致调用通知代码,而 .Sum 不会(他们的代码将根据公寓规则通过我们的线程调度程序调用,如果他们没有收到通知,他们会至少在最初责怪我们)。过去我无缝集成了 Task/await+async,它已经 3 年没有遇到任何与可用性相关的问题,这些东西非常直观...... RX 看起来像是一个很大的承诺
    • 我无法真正回答为什么。我可以说MinMaxSumAverageCountAggregate 都以相同的方式工作。我最好的猜测是它们类似于 LINQ to Objects,其中.Sum(和朋友)只有在可枚举完成后才能完成。
    • 问题是Aggregate 和Scan 理论上是一样的。 RX 通常旨在简化数据流的处理,对吗?因此,我希望清楚地指示每个方法在应用于无限流时的行为(例如为每个类和方法声明线程安全属性)。有许多微妙的不同事物,例如聚合和扫描、缓冲区和窗口等 - 并且没有容易发现的信息来源有什么区别以及何时应该使用哪个变体。无论如何,谢谢你的帮助。
    • AggregateScan 在理论上是相似的,但并不相同。当您在 IEnumerable<int> 实例上调用 Sum 时,您会得到一个标量结果还是 IEnumerable 中的一系列值?答案是你只能得到一个标量值。 IObservable.Sum() 也只返回一个值,但它必须在 future/async 中,因此返回类型是IObservable<int>。它可能是Task<int>,但是你正在混合并发范例。相比之下,Scan 将在计算时产生所有中间值;非常适合运行总计。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-04
    相关资源
    最近更新 更多