【问题标题】:Aggregating observable data into multiple buckets将可观察数据聚合到多个存储桶中
【发布时间】:2017-03-13 19:12:39
【问题描述】:

我需要一个关于如何分发聚合更新的好主意...

假设我有一个 Id 的 IObservable 和一个产生永无止境的消息流的值(5-10,000/秒)。现在我想计算很多聚合(例如总和) 定期分发到其他系统 - 假设每个聚合每 10 秒一次。 聚合基于元组的 ID(字符串),但可能会落入多个聚合(聚合定义应包含哪些 id - 因此会重叠)。

会有几千个聚合定义,那么有人对如何解决这个问题有任何想法吗?

概念上:

public struct Update
{
    public string Id { get; }

    public int Value { get; }
}

public class Aggregate
{
    Dictionary<string, Update> latestValues = new Dictionary<string, Update>();

    public void AddUpdate(Update update)
    {
        latestValues[update.Id] = update;
    }

    public int CalculateSum()
    {
        return latestValues.Values.Select(v => v.Value).Sum();
    }
}

更新:

这个问题的目的是为了简化真正的问题——也许我没有做得那么好——很抱歉。 假设我有多个 IOT 设备产生温度并定期报告该温度(更新流)。然后不同的用户可以选择查看设备子集的聚合(例如平均值)值。因此,一位客户可能希望查看设备 1、2 和 3 的平均值,而另一位客户可能希望查看设备 2、3 和 4 的平均值等(聚合定义)

【问题讨论】:

  • 聚合组键是否总是等于Id,并且单个消息只能属于单个聚合组?
  • 我们可以得到一个 mcve 吗? stackoverflow.com/help/mcve
  • 你的意思是10.000 是一万还是十个小数点后三个零?
  • 抱歉,欧洲小数点不正确 - 我已将其更正为一万。一条消息可以属于多个聚合组,这使得它变得更加复杂
  • @supertopi,我认为这种情况下的聚合是 SumMinMax 等聚合操作,而不是 DDD 聚合。 (嗯,这就是我最初阅读它的方式)。所以我认为这很简单“如何使用 Rx 创建高流量读取模型?”。而且我想在出现已知的性能问题之前,答案就是每个聚合操作都订阅并更新其状态。

标签: .net aggregate system.reactive observable


【解决方案1】:

我认为您要问的是如何使用 Rx 创建实时读取模型*。

鉴于我可以从您的问题中猜到,我认为您希望能够通过每个更新消息更新一些当前状态。对于您的 CalculateSum 方法,您不能只对所有消息的 Value 属性求和,因为有些属性旨在更新/覆盖现有值。

因此,鉴于此假设,看起来GroupBy 将成为您的朋友。如果您首先将可观察的值序列拆分为子序列,则可以分而治之。

input.GroupBy(i=>i.Id)

如果我们只考虑属于同一 Id 的单个值流,那么每个值的总和应该是多少?

-1--1--2-

在这种简单的情况下,答案总是直接传递的值。即

input  -1--1--2-
result -1--1--2-

但是,当我们查看两个产生值的序列时,计算变得稍微困难​​了

inputA  -1-1-2--------
inputB  --1-2-2-3-5-2-
result  -122344-5-7-4-

在这里,我们需要查看序列中每个值的增量是什么,并将该增量推送到结果中。可以这样可视化

inputA  -1-1-2--------
 delta  -1-0-1--------

inputB  --1-2-2-3-5-2-
 delta  --1-1-0-1-2-(-3)-

result  -122344-5-7-4-

要创建这种增量投影,您可以编写类似

的内容
input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
    .Select(acc => acc.Delta);

将这些放在一起,代码可能如下所示:

void Main()
{
    var testScheduler = new TestScheduler();
    var input = testScheduler.CreateColdObservable<Update>(
        ReactiveTest.OnNext(010, new Update("a", 1)),       //1
        ReactiveTest.OnNext(020, new Update("b", 1)),       //2
        ReactiveTest.OnNext(030, new Update("c", 3)),       //5
        ReactiveTest.OnNext(040, new Update("a", 1)),       //5
        ReactiveTest.OnNext(050, new Update("b", 2)),       //6
        ReactiveTest.OnNext(060, new Update("a", 2)),       //7
        ReactiveTest.OnNext(070, new Update("b", 2)),       //7
        ReactiveTest.OnNext(080, new Update("b", 3)),       //8
        ReactiveTest.OnNext(090, new Update("b", 5)),       //10
        ReactiveTest.OnNext(100, new Update("b", 2))        //7

    );

    var currentSum = input.GroupBy(i => i.Id)
        .SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
        .Select(acc => acc.Delta)
        .Scan((acc, cur) => acc + cur);

    var observer = testScheduler.CreateObserver<int>();
    var subscription = currentSum.Subscribe(observer);
    testScheduler.Start();
    subscription.Dispose();

    ReactiveAssert.AreElementsEqual(new[]
        {
            ReactiveTest.OnNext(010, 1),
            ReactiveTest.OnNext(020, 2),
            ReactiveTest.OnNext(030, 5),
            ReactiveTest.OnNext(040, 5),
            ReactiveTest.OnNext(050, 6),
            ReactiveTest.OnNext(060, 7),
            ReactiveTest.OnNext(070, 7),
            ReactiveTest.OnNext(080, 8),
            ReactiveTest.OnNext(090, 10),
            ReactiveTest.OnNext(100, 7)}
        ,
        observer.Messages);
}

// Define other methods and classes here
public struct Update
{
    public Update(string id, int value)
    {
        Id = id;
        Value = value;
    }
    public string Id { get; }

    public int Value { get; }
}

如果你想创建多个聚合,那么每个新聚合只是一个像上面一样的查询。您可以通过在分组后共享/发布序列来进行优化,但我会首先确保分析需要这样做。

*CQRS/ES 术语中的读取模型。

【讨论】:

  • 感谢您详尽的回答 - 作为奖励,它教会了我一些关于 TestScheduler 的额外知识。我会试试这个!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-10-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多