【问题标题】:How to combine aggregations from grouping如何组合来自分组的聚合
【发布时间】:2014-03-27 16:18:27
【问题描述】:

我有一个尾部实现,它将文本文件中的新行推送到Subject。该文件有以下数据:

source1, 1
source2, 3
source1, 2
source1, 1
source3, 10

我正在尝试按来源创建最新聚合的视图,比如说运行总和。

第一行之后:

source1, 1

第二行之后:

source1, 1
source2, 3

第三行之后:

source1, 3
source2, 3

第四行之后:

source1, 4
source2, 3

最后一行之后:

第四行之后:

source1, 4
source2, 3
source3, 10

这是我目前所得到的(在 LinqPad 中):

var source = Observable.Generate<int,Measurment>(0,
    current => current <= 10,
    current => current + 1,
    current => current % 3 == 0
        ? new Measurment { Source = "Source1", Value = current }
        : current != 10
            ? new Measurment { Source = "Source2", Value = current }
            : new Measurment { Source = "Source3", Value = current }
);
var grouped = source
.GroupBy(m => m.Source)
.Select(g =>g.Scan((acc,current) =>
    new Measurment { Source = acc.Source, Value = acc.Value + current.Value }));

grouped.Dump();

}
struct Measurment
{
    public string Source;
    public int Value;

我收到IObservable&lt;IObservable&lt;Measurment&gt;&gt;。内部 IObservable 具有正确聚合的值。现在我需要合并这些流并推出一个关于任何 Observables 变化的测量列表,有什么建议吗?

【问题讨论】:

    标签: system.reactive


    【解决方案1】:

    this answerthis one 类似,CombineLatestRxx 重载将IObservable&lt;IObservable&lt;T&gt;&gt; 转换为IObservable&lt;IList&lt;T&gt;&gt;,这应该可以满足您的需求。我认为这个函数真的应该在主 Rx 库中,因为它非常有用。

    【讨论】:

    • 太棒了,正是我需要的。我查看了 core 包中的 CombineLatest,但没有一个超载占用 IObservable&lt;IObservable&lt;Measurment&gt;&gt;。我找不到 Rx 2.x 的 NuGet 包,所以我只是复制了这个扩展方法。是否有 Rx 2.xxx 的 Rxx 版本?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-02-18
    • 1970-01-01
    • 2012-09-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多