【问题标题】:How to get Sum of "last" items of N hot Observable<decimal> instances?如何获得 N 个热门 Observable<decimal> 实例的“最后”项的总和?
【发布时间】:2013-09-19 00:04:29
【问题描述】:

编辑:2013 年 9 月 15 日 - 我将我的情景描述为进一步分解为步骤,以帮助每个人更好地了解我的情况。也添加了整个应用程序的下载源。如果您想跳到原始问题,请向下滚动到最后一个标题。请让我知道这些问题。谢谢

总结

阿拉斯加州首府朱诺有一座 AST(阿拉斯加州骑兵)总部大楼,他们希望在其中显示一个大屏幕,其中显示并自动更新一个数字。该数字称为(犯罪商数指数)或 CQI

CQI 基本上是一个计算出来的数字,显示该州当前的犯罪情况......

如何计算?

运行屏幕的程序是一个 .NET WPF 应用程序,它通过 Hot IObservable 流不断接收 CrimeReport 对象。

按城市计算 CQI,然后对所有城市进行 Sum(),称为州 CQI 下面是计算State CQI的步骤

第 1 步 - 接收犯罪数据

每次报告犯罪时,CrimeReport 都会发送到 .NET 应用程序。它有以下组件

犯罪时间

城市 - 管辖的城市/县

SeverityLevel - 严重/非严重

EstimatedSolveTime - AST 确定解决犯罪所需的估计天数。

所以在这一步,我们订阅 IObservable 并创建 MainViewModel 的实例

IObservable<CrimeReport> reportSource = mainSource.Publish();
  MainVM = new MainViewModel(reportSource);
reportSource.Connect();

第 2 步 - 按城市分组并按城市计算

收到报告后,按城市分组

var cities = reportSource.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g);

CityDto 是一个 DTO 类,它获取当前城市的所有报告并计算城市的 CQI。

City的CQI计算公式如下

如果严重犯罪总数与非严重犯罪总数的比率小于1

然后

城市的 CQI = 比率 x 估计求解时间的最小值

其他

城市的 CQI = 比率 x 最大估计求解时间

这是 CityDto 的类定义

internal class CityDto 
{
 public string CityName { get; set; }
 public IObservable<decimal> CityCqi {get; set;}

 public CityDto(string cityName, IObservable<CrimeReport> cityReports)
 {
   CityName = cityName;
   // Get all serious and non serious crimes
   //
     var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
     .Scan(0, (p, _) => p++);
     var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
     .Scan(0, (p, _) => p++);

     // Get the ratio
     //
     var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes, 
                   (s, n) => n ==  0? s : s/n); // Avoding DivideByZero here

     // Get the minimum and maximum estimated solve time
     //
       var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime) 
                     .Scan(5000, (p, n) => n < p? n : p);
       var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
                     .Scan(0, (p, n) => n > p? n : p);

    //Time for the City's CQI
    // 
      CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
 }
}

现在我们有 City DTO 对象维护 City 的 CQI 值并通过 IObservable 公开该实时 CQI,阿拉斯加州首府希望 Sum() 汇总所有城市的 CQI 以将其显示为阿拉斯加的 CQI 并在屏幕和参与 CQI 计划的市/县任何地方报告的每项犯罪都应立即对州的 CQI 产生影响

第 3 步 - 汇总各州的城市数据

现在我们要计算在大屏幕上实时更新的整个 state 的 CQI,我们有 State 的 view model 叫做 MainViewModel

internal class MainViewModel
{
    public MainViewModel(IObservable<CrimeReport> mainReport)
    {
         /// Here is the snippet also mentioned in Step 2
         //
           var cities = mainReport.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g));

         ///// T h i s ///// Is //// Where //// I /// am /// Stuck
         //
          var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,

         /// Need to use latest of each observable in allCqi and sum them up
           //// How do I do it ?
     }
}

约束

  • 目前并非阿拉斯加的所有城市都参与了州的 CQI 计划,但是城市每天都在招收,所以我无法列出所有城市,无论是否注册都添加所有城市也不切实际。因此,IObservable 只维护那些不仅参与而且至少发送了一个 CrimeReport 对象的城市。

完整的源代码

点击Here即可下载源码

最初提出的问题

我有多个 hot observables 中的一个 hot observable...

IObservable<IObservable<decimal>>

我想要一个 observable,当订阅它时,它的观察者会随时了解内部所有 observable 中所有“最新”十进制数字的总和。

我怎样才能做到这一点?我尝试了 CombineLatest(...) 但无法正确处理。

谢谢

【问题讨论】:

    标签: c# system.reactive reactive-programming observable


    【解决方案1】:

    Rxx 库的重载为 CombineLatest(),它采用 IObservable&lt;IObservable&lt;T&gt;&gt;。如果你使用这个重载,那么解决方案很简单:

    var runningSum = allCqis
        .Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0
        .CombineLatest() // produces an IObservable<IList<decimal>>
        .Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable<decimal>)
    

    查看Rxx.CombineLatest 的源代码可能有助于了解问题是如何“在后台”解决的

    【讨论】:

      【解决方案2】:

      很多问题!也许是时候复习一下你的 Rx Skillz 了?我的网站IntroToRx.com 几乎涵盖了您最近提出的所有问题。对 Rx 有深入的了解,将使您能够比在论坛上提问更快地回答这些相当简单的问题。您应该可以在不到 3 天的时间内阅读这本书。

      无论如何....

      1 您想要一个连续的总和还是最后一个单一的总和?

      2 那么,您想要所有流的所有值的总和,还是每个流的总和?

      要获得序列的单个总和值,请使用 .Sum() 运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#MaxAndMin

      要获得运行总数,请使用 Scan 运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#Scan

      所以答案可能是这样的(未经测试):

      sources.Select(source=>source.Scan(0m, (acc, value)=>acc+=value)).Merge();
      

      【讨论】:

      • 我已经看过你的书了。我也在 Kindle 上订购了一份(我认为它的价格过低)。我的场景有点复杂。我将编辑我的问题以帮助大家更好地理解。
      • Rx(以及任何 IMO 计算机科学问题)的关键是将其分解为简单的问题。根据我的经验,一个复杂的问题是一个被误解的问题。
      • 我已经非常详细地描述了我的场景。感谢您努力帮助我。
      • 我分享了源代码。问题中添加了下载源代码的链接。谢谢你
      • 这本书很棒,每个刚开始 Rx 的人都应该读一读!但是,由于问题已更新,因此此答案不是很有用..
      猜你喜欢
      • 1970-01-01
      • 2023-03-05
      • 2017-07-15
      • 2016-03-05
      • 1970-01-01
      • 1970-01-01
      • 2020-02-19
      • 1970-01-01
      • 2019-11-21
      相关资源
      最近更新 更多