【问题标题】:Dynamically concatenating sequences with Reactive Extensions使用响应式扩展动态连接序列
【发布时间】:2016-07-03 20:41:27
【问题描述】:

我想创建一个序列,它连接一个或多个动态创建的序列(在运行时)。

我尝试使用mySequence = mySequence.Concat(anotherSequence),但这会破坏当前对mySequence 的订阅,因为每次都会创建一个新序列。

【问题讨论】:

    标签: c# .net system.reactive observable rx.net


    【解决方案1】:

    当您将一个可观察序列连接到另一个时,第一个序列必须在您从第二个序列中获取任何值之前结束。这听起来更像是您想要合并两个或多个序列 - 换句话说,只要该序列产生值,就从任何序列中获取值。

    所以,如果您允许我将 .Concat 更改为 .Merge,听起来您现在有这样的代码:

    IObservable<long> mySequence = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5);
    IDisposable mySequenceSubscription = mySequence.Subscribe(n => Console.WriteLine(n));
    IObservable<long> anotherSequence = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5);
    mySequence = mySequence.Merge(anotherSequence);
    

    如果我运行它,我会得到这些值:

    0 1 2 3 4

    第二个序列没有合并。

    现在,如果您在创建订阅时不知道要合并的未来可观察对象是什么,那么您可以这样做:

    Subject<IObservable<long>> sources = new Subject<System.IObservable<long>>();
    IDisposable sourceSubscription = sources.Merge().Subscribe(n => Console.WriteLine(n));
    
    sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5));
    sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5));
    

    现在结果如下所示:

    0 1 0 2 3 4 1 2 3 4

    这已正确地将订阅后添加的两个可观察对象合并在一起。很简单。

    【讨论】:

    • 如果您想在之后删除其中一个来源怎么办?这可能吗?
    • @EduardoBrites - 是的,结束源代码。使用TakeUntilTake 的其他变体之一相当容易。
    猜你喜欢
    • 2015-10-07
    • 1970-01-01
    • 2014-05-17
    • 2016-09-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-27
    相关资源
    最近更新 更多