【问题标题】:Reactive extension Timer/Interval reset无功扩展定时器/间隔重置
【发布时间】:2016-01-19 20:36:12
【问题描述】:

我有一个项目,我需要每 10 秒发送一次状态消息,除非在此期间有更新。这意味着,每次有更新时,计时器都会重置。

var res = Observable
  .Interval(TimeSpan.FromSeconds(10))
  .Where(_ => condition);

res.Subscribe(_ => Console.WriteLine("Status sent."));

现在我知道“Where”只会在计时器结束时应用,所以它没有帮助。但是,我想知道是否有办法重置间隔;或使用带有重复的 Timer()。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    这很容易使用标准的 Rx 操作符来实现。

    您的问题不清楚的是“更新”究竟是什么。我将假设您有某种 observable 可在每次更新时触发,或者您可以创建一个主题,当有更新时您将调用 .OnNext(...)。如果没有可观察到的更新,很难知道何时重置计时器。

    代码如下:

    var update = new Subject<bool>();
    
    var res =
        update
            .Select(x => Observable.Interval(TimeSpan.FromSeconds(10.0)))
            .Switch();
    
    res
        .Subscribe(_ => Console.WriteLine("Status sent."));
    
    update.OnNext(true);
    

    res 查询现在一直等待,直到它从 update 获得一个值,然后它选择一个新的 Observable.Interval。这意味着Select 之后的类型是IObservable&lt;IObservable&lt;long&gt;&gt;,因此需要.Switch() 才能将其转换为IObservable&lt;long&gt;.Switch() 通过仅传递最新观察到的可观察对象的值并处理任何先前的可观察对象来做到这一点。换句话说,对于每个update,都会启动一个新的计时器,并取消前一个计时器。这意味着,如果您的更新频率超过 10 秒,则计时器将永远不会触发。

    现在,如果 res observable 本身就是一个更新,那么您可以这样做:

    res
        .Subscribe(_ =>
        {
            update.OnNext(true);
            Console.WriteLine("Status sent.");
        });
    

    没关系 - 它仍然有效,但是对于每个触发 res 的计时器,都会创建一个新计时器。这意味着任何依赖于您的 update observable/subject 的东西仍然可以正常运行。

    【讨论】:

    • 非常干净的实现。我将它用于“注销”计时器,如果状态在一段时间内没有改变,它依赖于 Redux 存储(即流)来触发注销。正是这种东西让 Rx 的使用变得如此有趣。
    【解决方案2】:

    我随身携带这个小辅助方法:

    public static IObservable<long> CreateAutoResetInterval<TSource>(IObservable<TSource> resetter, TimeSpan timeSpan, bool immediate = false)
    {
        return resetter.Select(_ => immediate ? Observable.Interval(timeSpan).StartWith(0) : Observable.Interval(timeSpan)).Switch();
    }
    

    基本上和Enigmativity的回答机制是一样的

    【讨论】:

      【解决方案3】:

      我认为您也可以在这里使用 Throttle。 Throttle 的目的不是让元素在给定的时间跨度内收到另一个元素。因此,在您的情况下,如果在 10 秒内收到更新消息,则不要发送状态。请参阅下面的单元测试,它使用 200 个滴答作为节流周期。

      [TestMethod]
          public void Publish_Status_If_Nothing_Receieved()
          {
              //Arrange
              var scheduler = new TestScheduler();
              var statusObserver = scheduler.CreateObserver<Unit>();
              var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3),
                  OnNext(700, 4));
      
              var waitTime = TimeSpan.FromTicks(200);
      
              //Act
              updateStream.Throttle(waitTime, scheduler)
                  .Select(_ => Unit.Default)
                  .Subscribe(statusObserver);
      
              //Verify no status received
              scheduler.AdvanceTo(100);
              Assert.AreEqual(0, statusObserver.Messages.Count);
      
              //Verify no status received
              scheduler.AdvanceTo(200);
              Assert.AreEqual(0, statusObserver.Messages.Count);
      
              //Assert status received
              scheduler.AdvanceTo(400);
              statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
      
              //Verify no more status received
              scheduler.AdvanceTo(700);
              statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
          }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-02-23
        • 1970-01-01
        相关资源
        最近更新 更多