【问题标题】:Is there something like ThrottleOrMax in rx?rx 中有类似 ThrottleOrMax 的东西吗?
【发布时间】:2015-08-25 22:12:50
【问题描述】:

用例: 我正在写一个监视更改并自动保存的东西。 我想节流,这样我保存的频率不会超过每五秒一次。 如果有连续的变化,我想每 30 秒保存一次。

在文档中找不到 observable.Throttle(mergeTime, maxTime) 并且只能想到编写自己的丑陋方式,因此提出了这个问题。

【问题讨论】:

  • 所以你问的是那里有Throttle 还是有人会给你写信?
  • @TonyTheLion 是的,这有什么奇怪的?

标签: c# events system.reactive throttling


【解决方案1】:

这是一种使用GroupByUntil的方法:

public static IObservable<T> ThrottleWithMax_GroupBy<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
    return source
        .GroupByUntil(
            t => 0, // they all get the same key
            t => t, // the element is the element
            g =>
            {
                // expire the group when it slows down for throttle
                // or when it exceeds maxTime
                return g
                    .Throttle(throttle, scheduler ?? Scheduler.Default)
                    .Timeout(maxTime, Observable.Empty<T>(), scheduler ?? Scheduler.Default);
            })
        .SelectMany(g => g.LastAsync());
}

这是一种使用Window的方法:

public static IObservable<T> ThrottleWithMax_Window<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
    return source.Publish(p => p
            .Window(() =>
            {
                // close the window when p slows down for throttle
                // or when it exceeds maxTime.
                // do not start throttling or the maxTime timer
                // until the first p of the new window arrives
                var throttleTimer = p.Throttle(throttle, scheduler ?? Scheduler.Default);
                var timeoutTimer = p.Delay(maxTime, scheduler ?? Scheduler.Default);
                // signal when either timer signals
                return throttleTimer.Amb(timeoutTimer);
            })
            .SelectMany(w => w.TakeLast(1)));
}

这是一个交互式弹珠图(拖动输入弹珠):

Examples["throttleWithMax"] = {
  category: "Custom",
  label: "throttleWithMax(5, 10)",
  inputs: [
    [1, 4, 8, 12, 20, 24, 28, 50].map(function(i) {
      return {
        d: i,
        t: i
      };
    }).concat([55])
  ],
  apply: function(inputs, scheduler, Rx) {
    Rx.Observable.prototype.throttleWithMax = function(throttle, maxTime, scheduler) {
      var s = scheduler || Rx.Scheduler.timeout;
      return this
        .publish(function(p) {
          return p
            .window(function() {
              var throttleTimer = p.debounce(throttle, s);
              var timeoutTimer = p.delay(maxTime, s);
              return Rx.Observable.amb(throttleTimer, timeoutTimer);
            })
            .flatMap(function(w) {
              return w.takeLast(1);
            });
        });
    };

    return inputs[0].throttleWithMax(5, 10, scheduler);
  }
};

var d = document.createElement("div");
document.body.appendChild(d);
d.innerHTML = '<rx-marbles key="throttleWithMax"></rx-marbles>';
<script src="http://bman654.github.io/samples/rxmarbles-old/element.js"></script>
<!--[if lt IE 7]>
  <p class="browsehappy">You are using an <strong>outdated</strong> browser. Please <a href="http://browsehappy.com/">upgrade your browser</a> to improve your experience.</p>
<![endif]-->

这是一个单元测试,它使用TestScheduler 来控制时钟并从中取出系统时钟的随机性:

private const int _THROTTLE = 50;
private const int _TIMEOUT = 100;
private const int _COMPLETE = 100000;
[TestCase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "g1")]
[TestCase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "g2")]
[TestCase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "g3")]
[TestCase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "w1")]
[TestCase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "w2")]
[TestCase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "w3")]
public void Throttle(string which, int[] pattern, int[] expectedPattern, int[] expectedTimes)
{
    var scheduler = new TestScheduler();
    var completeEvent = new[] { ReactiveTest.OnCompleted(_COMPLETE, _COMPLETE) };
    var source = scheduler.CreateColdObservable(pattern.Select(v => ReactiveTest.OnNext(v, v)).Concat(completeEvent).ToArray());
    var throttled = source.ThrottleWithMax(which, TimeSpan.FromTicks(_THROTTLE), TimeSpan.FromTicks(_TIMEOUT), scheduler);
    var observer = scheduler.CreateObserver<int>();
    throttled.Subscribe(observer);

    // start the clock
    scheduler.Start();

    // check the results
    var expected = expectedPattern.Zip(expectedTimes, (v, t) => ReactiveTest.OnNext(t, v)).Concat(completeEvent).ToList();
    CollectionAssert.AreEqual(expected, observer.Messages);
}

这是complete unit test 代码。

【讨论】:

  • 您应该学习如何使用 Rx 调度程序,而不是尝试通过 StopWatch 进行操作。你应该了解TestScheduler,它可以让你以一种确定的方式编写这样的时序测试。我今晚会转换你的测试,我们会看看会发生什么......
  • 是的,很好的建议。我已经 learn.Delay() 太久了。顺便说一句,感谢您的努力,只是在我确认它有效之前不想 +1 并标记为答案。
  • 如果窗口中没有值,这不会抛出吗?
  • @JohanLarsson 原来的Window 版本是错误的。 GroupBy 版本似乎是正确的,但您对第二个测试用例的期望并不高(因为您的油门时间是 50,所以您只会看到 60 的结果,而不是 40 和 60。
  • 谢谢先生!我有一些阅读要做,rx 变得很快。 (我会尽快删除我上面的​​ cmets,因为它们现在只是噪音)
猜你喜欢
  • 2020-04-10
  • 1970-01-01
  • 2021-08-04
  • 2012-05-31
  • 2012-11-27
  • 2017-11-21
  • 2012-08-03
  • 1970-01-01
  • 2015-11-14
相关资源
最近更新 更多