【问题标题】:Rx - start buffering onNextRx - 开始缓冲 onNext
【发布时间】:2016-06-14 22:16:56
【问题描述】:

使用Observable.Buffer(TimeSpan timeSpan) Method,它将流分成10分钟的块并返回为IList,效果很好

var stream = Observable.FromEventPattern<*>(*,*);
stream.Buffer(TimeSpan.FromSeconds(10));

尝试实现更复杂的行为

  • 新事件被推送到流中时开始块(缓冲事件列表) (而不是每 10 秒)
  • 继续缓冲事件,直到 x 秒内没有事件被推送到流中

【问题讨论】:

  • 不知道这里有什么问题

标签: c# system.reactive reactive-programming


【解决方案1】:

试试这个:

var query =
    stream.Publish(
        ps => ps.Window(
            () => ps.Delay(TimeSpan.FromSeconds(1.0)).Take(1)));

【讨论】:

    【解决方案2】:

    我相信有很多方法可以做到这一点。

    我在这里有一个经过测试的示例

    void Main()
    {
        var scheduler = new TestScheduler();
        var stream = scheduler.CreateColdObservable(
            ReactiveTest.OnNext(1.Seconds(), 'A'),
            ReactiveTest.OnNext(2.Seconds(), 'B'),
            ReactiveTest.OnNext(13.Seconds(), 'C')
            );
    
        var observer = scheduler.CreateObserver<string>();
    
        var query = stream.Publish(s => {
            return s.Timeout(TimeSpan.FromSeconds(10), Observable.Empty<char>(), scheduler)
                .ToList()
                .Where(buffer=>buffer.Any())    
                //Project to string to make equality test easier for the example.       
                .Select(buffer=>string.Join(",", buffer))
                .Repeat();
        });
    
        query.Subscribe(observer);
    
        scheduler.AdvanceBy(100.Seconds());
    
        ReactiveAssert.AreElementsEqual(
            new []{
                ReactiveTest.OnNext(12.Seconds(), "A,B"),
                ReactiveTest.OnNext(23.Seconds(), "C")
            },
            observer.Messages);
    }
    
    // Define other methods and classes here
    public static class TimeEx
    {
        public static long Seconds(this int seconds)
        {
            return TimeSpan.FromSeconds(seconds).Ticks;
        }
    }
    

    请注意,我只是将 Buffered 列表设为字符串,以便更轻松地验证相等性。即"A,B" 而不是{'A', 'B'}

    要考虑的其他选项是 WindowGroupJoin 运算符来执行此操作 - 请参阅 http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html。我确信可以将其他运算符拼接在一起,例如SwitchSelectTimerTimeout 等以获得您的结果。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多