【问题标题】:Why is my RX chain blocking?为什么我的 RX 链阻塞?
【发布时间】:2014-07-09 02:38:17
【问题描述】:

所以我有以下 RX 更改,但它似乎阻止了选择,好像是为了保留顺序。我的理解是它应该继续委托给任务池?

var observable = Observable.Interval(TimeSpan.FromMilliseconds(10));

observable.ObserveOn(Scheduler.TaskPool)
    .Select(
    i =>
    {
        Console.WriteLine("Here" + System.Threading.Thread.CurrentThread.ManagedThreadId);
        System.Threading.Thread.Sleep(5000);
        return i;
    })
    .ObserveOn(Scheduler.TaskPool)
    .SubscribeOn(Scheduler.TaskPool)
    .Subscribe(i => { Console.WriteLine(i); });

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    确保事件按顺序传递给订阅者是 Rx 语法的核心部分,也是正确操作的基础。它在大多数 Rx 运算符中都强制执行,您不应违反此规定。

    ObserveOn 和 SubscribeOn 的机制得到了充分的解决 here

    ObserveOn 的目的是避免阻塞正在调度事件的 observable 线程和/或控制订阅者接收事件的线程(在您的情况下使用任务池来传递事件)。

    它不允许订阅者同时接收事件 - 正如我所说,这将违反 Rx 的规则。

    您可能会发现this 的问题也值得一读。

    【讨论】:

    • Ensuring events delivered to a subscriber serially is a core part of the Rx grammar - 哇,我的理解是巨大的失败 - 有没有例子不是这种情况?
    • 是的,通常当有人试图自己实现IObservable<T> 时... :) 但是没有正确的方法来做到这一点。请注意,将单个事件同时传递给多个订阅者是例行公事。
    • 很抱歉,这很痛苦,您能否扩展一下 Note that concurrent delivery of a single event to multiple subscribers is routine....您是在谈论热门的可观察对象吗?
    • 很抱歉再次提出问题 - 但是是否存在 Observables/RX 像多个订阅者的队列一样的情况?
    • @Cheetah - 使用各种 SubjectsPublishMulticast 创建可交付给多个订阅者的 observables。 Publish().RefCount().ObserveOn() 是同时交付给多个订阅者的好方法。此外,在您的 OP 中,如果您使用 SelectMany 而不是 Select 并返回 Observable.Return(i).Delay(TimeSpan.FromSeconds(5)) 而不是 Thread.Sleep 您将有效地删除该块并允许您的睡眠全部同时运行(并且可能会乱序交付如果你有比持续延迟更复杂的事情)。
    猜你喜欢
    • 2011-07-04
    • 1970-01-01
    • 2012-10-31
    • 2020-01-05
    • 2012-04-15
    • 2012-08-23
    • 2012-10-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多