【问题标题】:Are these two Observable Operations Equivalent?这两个 Observable 操作是等价的吗?
【发布时间】:2014-04-07 16:49:05
【问题描述】:

我不知道为什么,但是由于某种原因,当使用通过 concat 创建的 observable 时,我总是会得到从我的列表中推送的所有值(按预期工作)。与普通订阅一样,似乎某些值永远不会到达那些订阅了 observable 的人(仅在某些条件下)。

这是我正在使用的两种情况。任何人都可以尝试解释为什么在某些情况下订阅第二个版本时没有收到所有值吗?它们不相等吗?这里的目的是倒带流。有哪些原因可以解释为什么案例 2 失败而案例 1 没有。

此处的重播只是正在进行的流的列表。

案例 1。

let observable = 
        Observable.Create(fun (o:IObserver<'a>) ->
                        let next b =
                            for v in replay do
                                o.OnNext(v.Head)
                            o.OnNext(b)
                            o.OnCompleted()
                        someOtherObs.Subscribe(next, o.OnError, o.OnCompleted))
let toReturn = observable.Concat(someOtherObs).Publish().RefCount()

案例 2。

let toReturn = 
    Observable.Create(fun (o:IObserver<'a>) ->
        for v in replay do
            o.OnNext(v.Head)
        someOtherObs.Subscribe(o)
    ).Publish().RefCount()

【问题讨论】:

  • 这两种情况下传递给Observable.Create的函数是不同的。如果您在两种情况下使用完全相同的函数,您是否也会看到相同的行为?
  • @MarkSeemann 见下方评论。

标签: f# system.reactive


【解决方案1】:

注意!我没有经常使用 F#,无法 100% 适应语法,但我想我明白发生了什么。

也就是说,这两种情况在我看来都很奇怪,这在很大程度上取决于 someOtherObs 是如何实现的,以及事情在哪里(就线程而言)运行。

案例一分析

您将 concat 应用于看起来像这样工作的源流:

  • 它订阅 someOtherObs,并响应第一个事件 (a) 将重播元素推送给观察者。
  • 然后它将事件 (a) 发送给观察者。
  • 然后就完成了。此时流已完成,不再发送任何事件。
  • 如果 someOtherObs 为空或只有一个错误,则会将其传播给观察者。

现在,当这个流完成时, someOtherObs 被连接到它上面。现在发生的事情有点不可预测 - 如果 someOtherObs 是冷的,那么第一个事件将被第二次发送,如果 someOtherObs 是热的,那么第一个事件不会被重新发送,但是有一个潜在的竞争条件,其余的事件将下一步,这取决于 someOtherObs 是如何实现的。如果天气很热,您很容易错过活动。

案例2分析

您重播所有重播事件,然后发送 someOtherObs 的所有事件 - 但如果 someOtherObs 很热,则再次存在竞争条件,因为您仅在推送重播后订阅,因此可能会错过一些事件。

评论

无论哪种情况,我都觉得很乱。

这看起来像是尝试合并世界状态 (sotw) 和直播。在这种情况下,您需要先订阅直播,并在获取并推送 sotw 事件时缓存所有事件。一旦 sotw 被推送,你就推送缓存的事件 - 小心删除可能在 sotw 中读取的事件 - 直到你赶上 live 时,你可以传递 live 事件。

您通常可以摆脱在实时流订阅的 OnNext 处理程序中刷新实时缓存的幼稚实现,在您刷新时有效地阻塞源 - 但如果出现以下情况,您将面临对实时源施加过多背压的风险您拥有丰富的历史和/或快速移动的直播。

您需要考虑的一些因素有望使您走上正确的道路。

作为参考,这里是 一个非常幼稚和简单的 C# 实现,我敲了敲它在带有 rx-main nuget 包的 LINQPad 中编译。我过去完成的生产就绪实施可能会变得相当复杂:

void Main()
{
    // asynchronously produce a list from 1 to 10
    Func<Task<List<int>>> sotw =
        () => Task<List<int>>.Run(() => Enumerable.Range(1, 10).ToList());
    
    // a stream of 5 to 15
    var live = Observable.Range(5, 10);
    
    // outputs 1 to 15
    live.MergeSotwWithLive(sotw).Subscribe(Console.WriteLine);
}

// Define other methods and classes here
public static class ObservableExtensions
{
    public static IObservable<TSource> MergeSotwWithLive<TSource>(
        this IObservable<TSource> live,
        Func<Task<List<TSource>>> sotwFactory)
    {
        return Observable.Create<TSource>(async o =>
        {       
            // Naïve indefinite caching, no error checking anywhere             
            var liveReplay = new ReplaySubject<TSource>();
            live.Subscribe(liveReplay);
            // No error checking, no timeout, no cancellation support
            var sotw = await sotwFactory();
            foreach(var evt in sotw)
            {
                o.OnNext(evt);
            }                               
                        
            // note naive disposal
            // and extremely naive de-duping (it really needs to compare
            // on some unique id)
            // we are only supporting disposal once the sotw is sent            
            return liveReplay.Where(evt => !sotw.Any(s => s.Equals(evt)))
                    .Subscribe(o);                  
        });
    }
}

【讨论】:

  • 感谢您的解释。数据很热,但通过同步主题(这是实时流)以受控方式进入。我不知道为什么,但是当我从案例 2 中删除 Publish().RefCount() 时,我得到了预期的结果。
  • 我不会依赖那个修复。我添加了一个 C# 示例,它显示了更多的内容。我试图指出它过于简单化的地方。我认为对于这个论坛来说,一个真实的例子会过于复杂。也许我在某个时候写过一篇博客!
  • 直播回放不应该订阅观察者吗?
  • 是否有一个重播主题只缓存直到它被订阅?你会认为这很常见。
  • 没有。不过,您可以指定缓存的限制。在我的生产实现中,我不使用主题。在这里演示要点很容易。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-02-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-21
  • 1970-01-01
相关资源
最近更新 更多