【问题标题】:Reactive Extensions, trigger OnNext immediately and then over an interval反应式扩展,立即触发 OnNext,然后在一段时间内触发
【发布时间】:2012-06-25 14:22:32
【问题描述】:

我有一个网络服务,我想在布尔值设置为 true 后立即查询更新,然后每 5 分钟重新检查一次。

这是我当前的 observable:

        _queryDisposable = Observable
            .Interval(TimeSpan.FromMinutes(5))
            .ObserveOn(Scheduler.ThreadPool)
            .Where(i => IsProcessing)     // IsProcessing is the bool value
            .Subscribe(GetFeeds, OnError, OnComplete);

此 observable 将每 5 分钟检查一次 IsProcessing 布尔值是真还是假,如果为真,则调用 GetFeeds

我认为获得预期效果的唯一方法是使 IsProcessing 成为属性支持的字段并使用 2 个可观察对象进行处理,如下所示:

    private bool _isProcessing;

    public bool IsProcessing
    {
        get { return _isProcessing; }
        set
        {
            if (_isProcessing == value)
                return;

            _isProcessing = value;

            if (!value)
            {
                if(_queryDisposable != null)
                    _queryDisposable.Dispose();
                _queryDisposable = null;
            }
            else
            {
                Observable
                    .Range(0,1)
                    .ObserveOn(Scheduler.ThreadPool)
                    .Subscribe(GetFeedsSafe, OnError, OnComplete);

                _queryDisposable = Observable
                    .Interval(TimeSpan.FromMinutes(5))
                    .ObserveOn(Scheduler.ThreadPool)
                    .Subscribe(GetFeedsSafe, OnError, OnComplete);
            }
        }
    }

我不认为这是一个非常优雅的解决方案,所以我想知道是否有更好的方法来实现这种效果?

【问题讨论】:

    标签: c# system.reactive observable


    【解决方案1】:

    我是否遗漏了什么,或者您不会在启动 Observable 之前立即自己调用订阅吗?

        Subscribe(GetFeeds, OnError, OnComplete);
        // Or perhaps, if you want it to be async,
        new TaskFactory().StartNew(()=>Subscribe(GetFeeds, OnError, OnComplete));
    
        _queryDisposable = Observable
            .Interval(TimeSpan.FromMinutes(5))
            .ObserveOn(Scheduler.ThreadPool)
            .Where(i => IsProcessing)     // IsProcessing is the bool value
            .Subscribe(GetFeeds, OnError, OnComplete);
    

    编辑:纯 Rx,来自http://social.msdn.microsoft.com/Forums/sa/rx/thread/c4acaf34-3136-4206-a6f9-ef5afba74b2b

    Observable.Interval(TimeSpan.FromMinutes(5))
            .StartWith(-1L)
            .ObserveOn(Scheduler.ThreadPool)
            .Where(i => IsProcessing)     // IsProcessing is the bool value
            .Subscribe(GetFeeds, OnError, OnComplete);
    

    【讨论】:

    • 大声笑,有效点!我想看看是否有一种纯粹的 Rx 方式来做类似“开火,然后在TimeSpan 间隔开火”。
    • 标准做法是将ObserveOn 作为订阅前的最后一个运算符,除非后面的运算符之一必须在特定的调度程序上运行。
    【解决方案2】:

    要立即开始排序并在给定时间内仍然产生一个值,您有两种选择:

    1. Observable.Interval(TimeSpan.FromMinutes(5)).StartWith(-1)
    2. Observable.Timer(TimeSpan.FromMinutes(0), TimeSpan.FromMinutes(5))

    接下来我假设您想在 IsProcessing 值设置为 false 时取消计时器,您还想在设置回 true 时重新启动序列。无论何时设置 IsProcessing 值,上面的两个答案都会运行 5 分钟的间隔。如果您设置 IsProcessing=true,序列将开始,如果一分钟后您设置 IsProcessing=false,然后 IsProcessing=true,您仍然需要等待 4 分钟才能再次查询。我想这不是你想要的?

    因此,我们希望在属性更改时触发它。 @SPFiredrake 在建议 INPC 并将事件转换为 Observable 序列方面做得很好。周围有很多小 sn-ps 可以帮助您做到这一点。假设我们使用的是扩展方法 PropertyChanges,我们可以编写这个非常简单的查询。

    var isProcessingValues = this.PropertyChanges(x=>x.IsProcessing);
    isProcessingValues
        .Where(isProcessing=>isProcessing)
        .SelectMany(_=> 
            Observable.Timer(TimeSpan.Zero, TimeSpan.FromMinutes(5))
                      .TakeUntil(isProcessingValues.Where(isProcessing=>!isProcessing))
        )
        .Select(_=>GetFeeds())  //What does GetFeeds actually return?
        .SubscribeOn(Scheduler.NewThread)
        .Subscribe(feedData=>Console.WriteLine(feedData), 
            ex=>Console.WriteLine (ex),
            ()=>Console.WriteLine (completed));
    

    慢慢来:

    1. 当 IsProcessing 属性发生变化时,将新值推送到序列中 (isProcessingValues)
    2. 当序列值为真时,启动一个计时器序列,该序列现在每 5 分钟产生一个值。
    3. 如果 isProcessingValues 序列产生 false 值,则计时器序列应停止
    4. 每次计时器计时(当 IsProcess 设置为 true 时每 5 分钟一次),调用 GetFeeds
    5. 确保我们的序列在不同的线程上订阅(不阻塞)。如果这不是要求,请删除。
    6. 从提要中获取数据并对其进行处理。

    【讨论】:

      【解决方案3】:

      您是否考虑过让 IsProcessing 变量触发一个道具更改事件,然后您可以收听该事件?

      var processing = Observable
          .FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
          .Where(tr => tr.EventArgs.Property == "IsProcessing" && ((Type)tr.Sender).IsProcessing);
      
      _queryDisposable = Observable
          .Interval(TimeSpan.FromMinutes(5))
          .ObserveOn(Scheduler.ThreadPool)
          .And(processing) // Will only fire when both sequences have an available value.
          .Subscribe(GetFeeds, OnError, OnComplete);
      

      这也确保如果它正在处理,它只会触发一次,因为处理 observable 一次只提供一个值(对于每个 IsProcessing 更改为 true),所以你不必担心它如果 5 分钟后仍在处理,则再次触发。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-06-01
        • 1970-01-01
        • 1970-01-01
        • 2022-01-09
        • 2021-12-14
        • 1970-01-01
        • 1970-01-01
        • 2011-04-07
        相关资源
        最近更新 更多