【问题标题】:Set Observable to complete, on a bool condition在布尔条件下将 Observable 设置为完成
【发布时间】:2020-06-29 16:23:17
【问题描述】:

我必须确保完成一个 Observable。它正在观察流数据,这不是我要修改的重点。 流数据可能根本不打勾。 我遇到了一个主题成员,我可以用它来手动完成 observable。

private readonly Subject<bool> _mySubject = new Subject<bool>();

// relevant property that should be completed
public IObservable<myModel> StreamObservable { get; private set; }

IObservable<myModel> someDataObservable = ... ; // may not tick in scenario

// this doesn't work            
StreamObservable = someDataObservable
                .TakeUntil(_mySubject)
                .Finally(() => logger.LogInformation("completed!"));
_mySubject.OnNext(true);

// I tried Amb.. still not going into Finally(), also when someDataObservable doesn't tick
StreamObservable = Observable.Amb(_mySubject.SelectMany(value => Observable.Empty<myModel>()), someDataObservable)
                .Finally(() => logger.LogInformation("completed!"));
_mySubject.OnNext(true);

如何完成,最好使用 Subject.OnNext() ?

主题触发器只是一个想法。我的偏好是留在观察世界并将序列转换为 Observable.Empty(),例如当这样的回调被调用时:

private Task<bool> FinishStreamCallback()
{
    _mySubject.OnNext(true); // my first approach
    return Task.FromResult(true);
}

.. 使用 OnCompleted()-订阅另一个类中的相关公共 observable 进行定位。

【问题讨论】:

    标签: c# system.reactive subject-observer


    【解决方案1】:

    使用这样的主题并不会结束可观察性。它不像名为OnCompleted 的可观察对象的来源。使用 TakeUntil 会导致源 observable 的订阅被释放。

    就像这样的代码:

    var source = new Subject<int>();
    var subscription = source.Materialize().Subscribe(x => Console.WriteLine(x.Kind));
    source.OnNext(1);
    subscription.Dispose();
    

    这只会显示OnNext

    如果我运行这个:

    var source = Observable.Return(42);
    var subscription = source.Materialize().Subscribe(x => Console.WriteLine(x.Kind));
    

    然后我得到:

    OnNext 已完成

    显式地或通过TakeUntil 调用.Dispose() 会导致源可观察对象停止在它所在的位置。没有产生OnCompleted

    如果您想知道何时取消订阅 observable,请尝试以下扩展方法:

    public static IObservable<T> OnUnsubscribe<T>(this IObservable<T> source, Action unsubscribe) =>
        Observable
            .Create<T>(o =>
                new CompositeDisposable(
                    source.Subscribe(o),
                    Disposable.Create(unsubscribe)));
                    
    

    尝试像这样使用它:

    var source = new Subject<int>();
    var trigger = new Subject<Unit>();
    var subscription =
        source
            .Materialize()
            .OnUnsubscribe(() => Console.WriteLine("Unsubscribed!"))
            .TakeUntil(trigger)
            .Subscribe(x => Console.WriteLine(x.Kind));
            
    source.OnNext(1);
    trigger.OnNext(Unit.Default);
    

    这给了我:

    OnNext 退订!

    【讨论】:

    • 很好的解释。但随后我将不得不在 OnUnsusbscribe 中使用类似 bool unsubscribed 的属性,从而将 observable 丢弃。我的偏好实际上是将 observable 修改为已完成。请看我上面的编辑。谢谢。
    • @deafjeff - 我不确定我是否理解您所说的“在 OnUnsusbscribe 中取消订阅 bool 之类的属性,将 observable 丢弃”是什么意思?
    • OnSubscribed() 正在工作并被触发。但实际上我正在订阅一个刚刚打勾的可观察对象。继续往下走,我将不得不使用新的公共 bool 属性向其他类说明某些事情发生了变化。但是 Observale 是对其他类公开的属性,并且是更大管道的一部分。我喜欢那个得到改造,(这里:到 OnCompleted())
    • @deafjeff - 如果你的 observable 被完全封装,那么在课堂上可能只需要一个 Subject&lt;T&gt; - 将它公开为你的公共 observable。那么你就可以完全控制了。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-07-31
    • 2019-06-11
    • 2018-04-01
    • 2017-04-11
    • 1970-01-01
    • 2022-10-25
    • 2017-01-10
    相关资源
    最近更新 更多