【问题标题】:Add an initial value to RX Stream / IObservable为 RX Stream / IObservable 添加一个初始值
【发布时间】:2020-02-20 17:19:29
【问题描述】:

我有一个方法可以将过滤后的 RX 流作为 Iobservable 返回:

public IObservable<Price> LivePrices(Instrument instrumentDetails)
        {
            return _priceObserver.Stream
                .Where(o => o.Symbol == instrumentDetails.Symbol )
                .Select(o => GetPrice(o, instrumentDetails));
        }

问题是对于某些值,流不会经常更改,因此我需要使用第一个值对其进行初始化

我该怎么做?我读到Subject 既可以是观察者,也可以是可观察者。所以我想我需要以某种方式将其订阅为Subject,在流中添加第一条消息,然后将其设置为现在的样子。但不知道该怎么做

有什么想法吗?

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    你试过StartWith吗?

    public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
    {   
        var obs = _ratesObserver.Stream
            .Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
            .Select(o => GetFxDeal(o, negotiation));
    
        return condition ? obs.StartWith(new FxDeal()) : obs;
    }
    

    【讨论】:

    • 哦,哇,我不知道这种方法存在。它看起来正是我需要的。虽然没有条件,但我大概可以这样做:obs.StartWith(new FxDeal());返回 obs;
    • 它会添加第一个值,然后是后续流?
    • 是的。只要有订阅,它就会立即推送该值,然后是流的其余部分。
    • 试试Observable.Interval(TimeSpan.FromSeconds(10)).StartWith(-1).Subscribe(Console.WriteLine);
    • 嘿@asti,我无法让它工作。认为这可能是因为该方法是异步的。在这里发布了另一个问题:stackoverflow.com/questions/60339649/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-01-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-01
    相关资源
    最近更新 更多