【问题标题】:How to specify dependencies between observable sequences?如何指定可观察序列之间的依赖关系?
【发布时间】:2021-08-20 14:58:54
【问题描述】:

我正在编写一个应用程序,它使用许多可观察对象计算数据,并且可观察对象之间存在依赖关系。 我有一个共同的可观察对象,其他可观察对象正在订阅它。 每次刷新公共可观察对象时,我都希望重新计算数据。 问题是,由于许多其他人都观察到了这个 observable,因此数据被重新计算了很多次而不是一次。

这是一个重现问题的单元测试。

应该如何声明modelObservable,以便最后nbComputations = 1? 我应该放弃关于我正在尝试做的事情的可观察性吗?

public void Test_Observable_Dependency()
{
    var nbComputations = 0;

    var commonObservable = new Subject<int>();

    var subject1 = new Subject<int>();
    var subject2 = new Subject<int>();
    var subject3 = new Subject<int>();

    var forecastCurves = new List<IObservable<int>>
    {
        subject1.CombineLatest(commonObservable, (a, b) => 0),
        subject2.CombineLatest(commonObservable, (a, b) => 0),
        subject3.CombineLatest(commonObservable, (a, b) => 0),
        commonObservable
    };

    var modelObservable = forecastCurves
        .CombineLatest()
        .CombineLatest(commonObservable, (x, y) =>
        {
            nbComputations++;
            return true;
        })
        .Replay(1).RefCount();
    _ = modelObservable.Subscribe();

    subject1.OnNext(1);
    subject2.OnNext(1);
    subject3.OnNext(1);

    //Before receiving all updates
    nbComputations.Should().Be(0);

    commonObservable.OnNext(1);

    //After receiving all updates
    nbComputations.Should().Be(1);

    nbComputations = 0;
    commonObservable.OnNext(1);

    nbComputations.Should().Be(1);
}

【问题讨论】:

  • edit 你的问题包含某种时间图,它显示了哪个 observable 应该在什么时候生成一个新值。您有四个不同的来源,目前还不清楚您要在哪里生成“尚未但稍后”的信号。
  • 你得到的计算是正确的。当commonObservable 获得一个值时,每个subject1.CombineLatest(commonObservable, (a, b) =&gt; 0) observables 都会触发。所以你得到了你的第一个,然后每次调用commonObservable.OnNext(1),你都会得到另一个4。正如预期的那样。
  • 你能描述一下你的 observables 的真正依赖吗?几乎总有办法让它干净利落地工作。

标签: c# system.reactive reactivex


【解决方案1】:

这可能是因为CombineLatest() 的广泛使用。每当任何可观察对象有更新时,操作员都会打勾。

我试着简化了一点,但我认为结果应该是一样的。 是你需要的吗?

public void Test_Observable_Dependency()
{
    var nbComputations = 0; 
    
    var commonObservable = new Subject<int>();
    var subject1 = new Subject<int>();
    var subject2 = new Subject<int>();
    var subject3 = new Subject<int>();
    
    
    var forecastCurves = Observable.CombineLatest(
        subject1,
        subject2,
        subject3,
        commonObservable,
        (s1, s2, s3, common) => new {
            // OP's queries inserted below
            s1 = 0, // (a, b) => 0 
            s2 = 0, // (a, b) => 0 
            s3 = 0, // (a, b) => 0 
            common});
    
    // No need to take WithLatestFrom commonObservable, 
    // because the latest value is available as f.common    
    var modelObservable = forecastCurves.Select(f =>
        {
            
            nbComputations++;
            return true;
        })
        .Replay(1).RefCount();
    _ = modelObservable.Subscribe();

    subject1.OnNext(1);
    subject2.OnNext(1);
    subject3.OnNext(1);

    //Before receiving all updates
    nbComputations.Should().Be(0);

    commonObservable.OnNext(1);

    //After receiving all updates
    nbComputations.Should().Be(1);

    nbComputations = 0;
    commonObservable.OnNext(1);

    nbComputations.Should().Be(1);
}

编辑,考虑到省略的查询。

【讨论】:

  • 您确实修改了 OP 的查询,因此结果不一样。 OP 里面有所有subject1.CombineLatest(commonObservable, (a, b) =&gt; 0)
  • 是的,我发帖后才意识到。我会尝试编辑它...
  • 已编辑。现在它包括了这些查询,不利的一面是每次任何主题打勾时都会计算它们。但它通过了测试(之前也通过了)? Red->Green-Refactor
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-08-31
  • 2012-10-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多