【问题标题】:Scheduling a IEnumerable periodically with .NET reactive extensions使用 .NET 响应式扩展定期调度 IEnumerable
【发布时间】:2012-09-23 10:07:32
【问题描述】:

比如说我有一个可枚举的

dim e = Enumerable.Range(0, 1024)

我希望能够做到

dim o = e.ToObservable(Timespan.FromSeconds(1))

这样 observable 会每秒生成值 直到可枚举项用完为止。我想不出一个简单的方法来 这样做。

【问题讨论】:

  • 您能描述一下您想要完成的高级场景吗?要么你误解了 Rx,要么你想做一些相当奇怪的事情......
  • 为了测试目的模拟 UDP 连接。我有一个生成器作为 IEnumerable 生成字节帧,我希望模拟它们实时而不是尽可能快地到达。我的应用程序的其余部分将在生成器模拟的网络的情况下正常运行。 UI 会在收到数据包时自行更新,以便在没有真实来源的情况下检查行为,我需要伪实时生成数据。
  • 你看过TestScheduler和朋友吗?这允许您模拟时间线 - 代码将给出与延迟相同的结果,但测试将立即运行。
  • 我不想模拟时间。我想模拟连接。我希望我的应用程序在我使用它时像真正的连接一样运行。

标签: system.reactive enumerable


【解决方案1】:

您可以将 Interval 与 Zip 一起使用以获得所需的功能:

var sequence = Observable.Interval(TimeSpan.FromSeconds(1))
                         .Zip(e.ToObservable(), (tick, index) => index)

【讨论】:

  • Zip 有一个超载,包含 IEnumerable
  • 这是否危险,例如无限序列。 zip不会尝试缓冲无限序列并快速炸毁内存还是比这更聪明?
  • 是的 Zip 将尝试缓冲值,直到它们与来自另一个序列的值作为一对被推送。所以在这个例子中,它将缓冲 1024 个值并需要 1024 秒才能完成。
【解决方案2】:

我也在寻找解决方案,在阅读了intro to rx 之后,我成为了自己的一个: 有一个Observable.Generate() 重载,我用它来制作我自己的ToObservable() 扩展方法,以TimeSpan 为句点:

public static class MyEx {
    public static IObservable<T> ToObservable<T>(this IEnumerable<T> enumerable, TimeSpan period) 
    {
        return Observable.Generate(
            enumerable.GetEnumerator(), 
            x => x.MoveNext(),
            x => x, 
            x => x.Current, 
            x => period);
    }
    public static IObservable<T> ToObservable<T>(this IEnumerable<T> enumerable, Func<T,TimeSpan> getPeriod) 
    {
        return Observable.Generate(
            enumerable.GetEnumerator(), 
            x => x.MoveNext(),
            x => x, 
            x => x.Current, 
            x => getPeriod(x.Current));
    }
}

已在 LINQPad 中测试。只关心在结果 observable 之后枚举器实例会发生什么,例如处置。任何更正表示赞赏。

【讨论】:

  • 这个解决方案只在第一次订阅返回的 IObservable 时才有效。这是因为 IEnumerator 实例被重用,导致未定义的行为。此外,它永远不会被丢弃,可能会泄漏。
【解决方案3】:

你需要一些东西来安排通知观察者从 Enumerable 中获取的每个值。 您可以使用recursive Schedule overload on an Rx scheduler

Public Shared Function Schedule ( _
    scheduler As IScheduler, _
    dueTime As TimeSpan, _
    action As Action(Of Action(Of TimeSpan)) _
) As IDisposable

在每次预定调用时,只需调用enumerator.MoveNext(),然后调用OnNext(enumerator.Current),最后在MoveNext() 返回false 时调用OnCompleted。这几乎是最简单的方法。

表达您的要求的另一种方法是将其重新表述为“对于一个序列,每个值之间有一个最小间隔”。

this answer。测试用例类似于您的原始问题。

【讨论】:

    【解决方案4】:

    你总是可以用这个非常简单的方法:

    dim e = Enumerable.Range(0, 1024)
    dim o = e.ToObservable().Do(Sub (x) Thread.Sleep(1000))
    

    当您订阅 o 时,值需要一秒钟才能生成。

    【讨论】:

    • 每次在观察者块内执行Thread.Sleep,杰弗里梵高都会杀死一只小猫。
    • j/k 我一直用它来测试。
    【解决方案5】:

    我只能假设您正在使用 Range 来简化您的问题。

    您是否希望 Enumerable 推送的每个值都延迟一秒?

    var e = Enumerable.Range(0, 10);
    var o = Observable.Interval(TimeSpan.FromSeconds(1))
                      .Zip(e, (_,i)=>i);
    

    或者您是否希望每秒钟只推送 Enumerable 的最后一个值。即从 Enumerable 中读取,在您枚举它时进行评估(可能是一些 IO)。在这种情况下,CombineLatestZip 更有用。

    或者您可能只想每秒获取一个值,在这种情况下,只需使用 Observable.Interval 方法

    var o = Observable.Interval(TimeSpan.FromSeconds(1));
    

    如果您解释您的问题空间,那么社区将能够更好地帮助您。

    *请原谅 C# 的答案,但我不知道等效的 VB.NET 代码是什么。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多