【问题标题】:How to complete a Rx Observable depending on a condition in a event如何根据事件中的条件完成 Rx Observable
【发布时间】:2011-11-15 06:03:38
【问题描述】:

我有一个我无法控制的事件,它为我提供了数据。 eventArgs 看起来像这样:

class MyEventArg {
  bool IsLastItem {get;}
  Data DataItem {get;}
}

我使用 Rx 将此事件转换为 IObservable。但如果 IsLastItem 为真,我想完成 observable。

有什么优雅的想法吗?一种方法是通过一个我可以更多控制的主题来传输数据,以便在条件发生时设置 OnComplete 事件......

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    如果您希望包含最后一个元素,您可以将仅包含最后一个元素的流与结合TakeWhile 的常规流合并。 这是一个简单的控制台应用程序来证明这一点:

    var subject = new List<string>
    {                            
    "test",
    "last"
    }.ToObservable();
    
    var my = subject
                .Where(x => x == "last").Take(1)
                .Merge(subject.TakeWhile(x => x != "last"));
    
    my.Subscribe(
        o => Console.WriteLine("On Next: " + o), 
        () => Console.WriteLine("Completed"));
    
    Console.ReadLine();
    

    打印出来:

    On Next: test
    On Next: last
    Completed
    

    更新 如果底层的 Observable 实际上没有完成,则会出现一个错误,它会抑制 OnCompleted 消息。我更正了代码以确保调用OnCompleted

    如果您想避免多次订阅底层序列以获得冷可观察对象,您可以像这样重构代码:

    var my = subject.Publish(p => p
                .Where(x => x == "last").Take(1)
                .Merge(p.TakeWhile(x => x != "last")));
    

    【讨论】:

    • 不错!我花了几秒钟来看看它是如何完成的。也许更好写: subject.TakeWhile(x => x != "last").Merge(subject.Where(x => x == "last").Take(1));冷的可观察尖端是+。谢谢
    【解决方案2】:
    public static IObservable<TSource> TakeWhileInclusive<TSource>(
            this IObservable<TSource> source, Func<TSource, bool> predicate)
    {
        return Observable
            .Create<TSource>(o => source.Subscribe(x =>
                                                       {
                                                           o.OnNext(x);
                                                           if (!predicate(x))
                                                               o.OnCompleted();
                                                       },
                                                   o.OnError,
                                                   o.OnCompleted
                                      ));
    }
    

    【讨论】:

      【解决方案3】:

      你在寻找这样的东西吗?

      IObservable<MyEventArg> result =
          myEventArgObservable.TakeWhile(arg => !arg.IsLastItem);
      

      【讨论】:

      • 哇,这样简单又好。如果谓词为真,你知道 Observable 是否完成吗?
      • 是的,它会通知 OnCompleted() 如果你不希望它通知 OnCompleted 你可以简单地使用 Where(arg => !arg.IsLastItem)
      • 是的,Observable 似乎完成了,但是现在我有一个问题是我不会收到最后一个项目...
      猜你喜欢
      • 1970-01-01
      • 2016-06-26
      • 1970-01-01
      • 2021-04-07
      • 2020-01-30
      • 1970-01-01
      • 2017-10-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多