【问题标题】:ReactiveX onComplete triggered before the last onNotifyReactiveX onComplete 在最后一个 onNotify 之前触发
【发布时间】:2015-02-26 17:06:49
【问题描述】:

考虑以下管道:

  1. 将物品缓冲到包装中
  2. 在线程池线程中观察这些包
  3. 对这些包进行一些异步处理

如果进程已经完成,将源 observable 设置为 complete 将导致缓冲区按原样发出当前包。但是,处理部分将在获得最后一个包之前获得完整的事件。

我的想法是等待最后一个包被处理,但由于我在 OnNext 之前得到 OnComplete,我似乎无法通过 ReactiveX 机制来做到这一点。

有什么方法可以让 OnComplete 在最后一个 OnNext 之后发生

这是一个模拟此行为的示例 (available as a .NET Fiddle):

    var publications = 
        obs
        .Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
        .Buffer(2)
        .Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
        .ObserveOn(ThreadPoolScheduler.Instance)
        .Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
        .SelectMany(x => Test(x).ToEnumerable())
        .Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));

    publications
        .Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
        .Subscribe()
        ;

    obs.OnNext(1);
    obs.OnNext(2);
    obs.OnNext(3);

    var nextpub = publications.FirstAsync();

    obs.OnCompleted();

    nextpub.Wait();

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    这段代码解码起来有点乱……我想你只是订阅了publications observable 两次:

    • 一旦你明确调用Subscribe()
    • 一旦您通过FirstAsync() 隐式调用它

    如果您重新排列如下。将您第一次订阅出版物的行替换为:

    var tcs = new TaskCompletionSource<Unit>();
    
    var nextpub = publications
        .Do(x => Console.WriteLine("publications notify {0}", x),
            () => Console.WriteLine("publications complete"))
        .Subscribe(_ => {}, () => tcs.SetResult(Unit.Default));
    

    删除带有FirstAsync() 的行并将对nextpub.Wait() 的调用替换为:

    tcs.Task.Wait();
    

    这不是编写 Rx 代码的推荐方法 - 只是修复您拥有的代码的最快方法。您通常应该在订阅者中处理您的结果,而不是阻止完成。例如:

    SomeObservable.Subscribe(x => /* handle result */);
    

    【讨论】:

    • 我知道阻塞不是 reactive 的事情,但我正在处理的实际情况是刷新管道并在终止进程之前等待其完成。我想我在尝试使用 Rx 方法做所有事情时走得太远了。
    【解决方案2】:

    在 Rx 中,重要的是要了解 observable 存在行为契约。您将始终从内置运算符中获取此序列:

    OnNext*(OnCompleted|OnError)?

    因此,零个或多个(可能无限)“OnNext”调用之后,可选地,可以是“OnCompleted”或“OnError”调用。

    对于单个 observable,您将永远不会在“OnNext”之前获得“OnCompleted”。

    现在,您的代码似乎表现不同 - 但事实并非如此。

    您实际上有两个独立订阅源 observable。

    这是一个订阅的样子:

        var publications = 
            obs
            .Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
            .Buffer(2)
            .Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
            .ObserveOn(ThreadPoolScheduler.Instance)
            .Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
            .SelectMany(x => Test(x).ToEnumerable())
            .Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
    
        publications
            .Do(x => Console.WriteLine("publications notify {0}", x), () => Console.WriteLine("publications complete"))
            .Subscribe(); /* Subscription #1 here! */
    
        obs.OnNext(1);
        obs.OnNext(2);
        obs.OnNext(3);
    
        obs.OnCompleted();
    

    这是另一个:

        var publications = 
            obs
            .Do(x => Console.WriteLine("to buffer {0}", x), () => Console.WriteLine("to buffer complete"))
            .Buffer(2)
            .Do(x => Console.WriteLine("from buffer {0}", ShowContent(x)), () => Console.WriteLine("from buffer complete"))
            .ObserveOn(ThreadPoolScheduler.Instance)
            .Do(x => Console.WriteLine("to selectmany {0}", ShowContent(x)), () => Console.WriteLine("to selectmany complete"))
            .SelectMany(x => Test(x).ToEnumerable())
            .Do(x => Console.WriteLine("notify {0}", x), () => Console.WriteLine("complete"));
    
        var nextpub = publications.FirstAsync();
    
        obs.OnCompleted();
    
        nextpub.Wait(); /* Subscription #2 here! */
    

    如果我们只查看“Subscription #1”并将调度程序更改为Scheduler.Immediate,我们会得到以下执行顺序:

    to buffer 1
    to buffer 2
    from buffer [1, 2]
    to selectmany [1, 2]
    > thread: 28
    notify ()
    publications notify ()
    to buffer 3
    to buffer complete
    from buffer [3]
    to selectmany [3]
    > thread: 28
    notify ()
    publications notify ()
    from buffer complete
    to selectmany complete
    complete
    publications complete
    

    这看起来仍然像我们在值出来之前得到to buffer complete。但这是误导。专门引入了 .Do(...) 运算符以允许将副作用引入 Rx 管道。因此,这可能会使事情看起来不正常,但如果您在创建的 Rx 管道中执行每个步骤,您会发现每个步骤都完美地遵循“OnNext*(OnCompleted|OnError)”合同。

    您确实需要专注于管道中的一个步骤,您会发现这一切都正确发生。

    【讨论】:

    • OnNext*(OnCompleted|OnError) 还是 OnNext*(OnCompleted|OnError)? (包括问号) - 即终止是可选的。
    • @JamesWorld - 是的,你是对的。我试图表达可能有无限数量的“OnNext”,这意味着一个可选的终止,但我更喜欢你的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-03
    • 1970-01-01
    • 2014-12-08
    • 1970-01-01
    • 2017-02-05
    • 1970-01-01
    相关资源
    最近更新 更多