【问题标题】:Observable.Delay calling Dispose before OnNext is firedObservable.Delay 在 OnNext 被触发之前调用 Dispose
【发布时间】:2010-10-20 01:39:05
【问题描述】:

我无法理解 Observable.Delay 的工作原理以及何时调用 Dispose()。有熟悉 Rx 的人可以帮忙吗?

以下代码sn-p:

    static void Main(string[] args)
    {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));

        Console.ReadKey();
    }

    public class SomeObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> o)
        {
            for (var i = 0; i < 2; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();

            return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
    }

    public class DisposableAction : IDisposable
    {
        public DisposableAction(Action dispose)
        {
            this.dispose = dispose;
        }

        readonly Action dispose;

        public void Dispose()
        {
            dispose();
        }
    }

产生以下结果:

0
1
弃置
弃置
弃置
...0...
...1...
......0......
......1......

我期待它更像:

0
1
弃置
...0...
...1...
弃置
......0......
......1......
弃置

有什么想法吗??

【问题讨论】:

  • 如果去掉“SubscribeOn(Threadpool)”会发生什么?
  • 注意:您的 DisposableAction 是内置 Disposable.Create 静态方法的副本。

标签: c# system.reactive reactive-programming


【解决方案1】:

Rx 的标准功能是在序列完成时释放订阅(即使它的值仍在通过另一个序列传递)。

考虑到这一点,Delay 无法控制从源序列发出值的速度,它只能将值延迟到它自己的观察者。

【讨论】:

    【解决方案2】:

    如果我不得不猜测,我会说 Delay 将来自原始 observable 的项目排队,然后根据指定的延迟在它认为合适的时候分派它们。因此,即使原来的 observable 早就被处理掉了,由 Delay 方法创建的 observable 仍然是活跃的。您观察到的行为非常符合这个解释。

    【讨论】:

    • 嗯,这有点道理,但为什么延迟的 observables 在它们开始之前就被处理掉了?据我了解,Subscribe(onNext) 扩展方法创建的 IObserver 只调用 Dispose OnCompleted 或 OnException。还是我还没有完全理解?
    • 处理的不是延迟的 observable,而是原始的 observable。尝试将 () => Console.WriteLine("completed") 添加到订阅调用。您会看到已按预期调用完成。
    • 我也在另一个论坛(Rx 论坛)上发布了这个并得到了很好的解释social.msdn.microsoft.com/Forums/en-US/rx/thread/… 谢谢
    【解决方案3】:

    没有 ThreadPool 的行为是相同的:

    0 1 弃置 弃置 ...0... ...1... ......0...... ......1......

    【讨论】:

      猜你喜欢
      • 2020-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-02-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多