【问题标题】:Zipping a repeated stream with RX使用 RX 压缩重复的流
【发布时间】:2018-01-29 18:44:28
【问题描述】:

我有两个热门的 observables 我不想错过任何通知

第一个 observable 产生数字

1-2-3-4

和第二个字符串

a-b

我正在寻找一种方法来压缩它们以产生下一个输出

a-1 b-2 a-3 b-4 a-5

所以我想要的是无限期地重复第二个流,直到第一个流被取消订阅。我尝试了类似下一个测试的方法,它产生了所需的输出。但是我永远不会Repeat,因为我不知道如何停下来测试结果。此外,我使用了ReplaySubject,我手动完成了这意味着我无法收到任何新通知。

    class MyClass:ReactiveTest{
        [Fact]
        public void MethodName4() {
            var strings = new ReplaySubject<string>();
            var testScheduler = new TestScheduler();
            testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
                strings.OnNext("a");
                strings.OnNext("b");
                strings.OnCompleted();
            });

            var numbers = testScheduler.CreateHotObservable(OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5));
            numbers.Zip(Observable.Defer(() => strings).Repeat(3), (i, s) => (i, s)).Subscribe();
            testScheduler.AdvanceBy(500);
        }

    }

【问题讨论】:

    标签: c# system.reactive rx.net


    【解决方案1】:

    我想我找到了一个我将尝试描述的解决方案。

    重复一个 hot observable 是没有意义的,因为它永远不会完成。但是,您可以重复这个 hot observable 的窗口。我从@Enigmativity 重构了这个answer 以重复所有值而不是最后一个。

        public static IObservable<T> RepeatAllDuringSilence<T>(this IObservable<T> source, TimeSpan maxQuietPeriod,
            IScheduler scheduler=null, IObservable<T> inner=null) {
            if (scheduler==null)
                scheduler=Scheduler.Default;
            if (inner == null)
                inner = source;
            return Observable.Create<T>(observer => {
                var replay = inner.Replay(scheduler);
                var sequence = source.Select(x => Observable.Interval(maxQuietPeriod, scheduler).SelectMany(l => replay).StartWith(x)).Switch();
                return new CompositeDisposable(replay.Connect(), sequence.Subscribe(observer));
            });
        }
    

    然后在我的测试中使用这种方法,我设法在没有完成 ReplaySubject 或丢失未来通知的情况下获得预期结果,因为我改变了我原来的 observable 的温度。

    class MyClass : ReactiveTest {
        [Fact]
        public void MethodName4() {
            var strings = new Subject<string>();
            var testScheduler = new TestScheduler();
            testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
                strings.OnNext("a");
                strings.OnNext("b");
            });
            testScheduler.Schedule("", TimeSpan.FromTicks(20), (scheduler, s) => {
                strings.OnNext("c");
            });
    
            var numbers = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5));
            numbers.Zip(strings.RepeatAllDuringSilence(TimeSpan.FromTicks(10),testScheduler), (i, s) => (i, s)).Subscribe(tuple => WriteLine(tuple));
            testScheduler.AdvanceBy(50);
        }
    
    }
    

    (1, a)
    (2, b)
    (3, c)
    (4、一)
    (5, b)

    【讨论】:

      【解决方案2】:

      这不能解决您的问题吗? :

        void TestMergedPairs()
          {
              var source1 = Observable.Range(1,9999);
              var source2 = Observable.Range(65, 2).Repeat();
      
      
              var z = source1.Zip(source2,(x,y)=> { return new { left = x, right = y }; })
                   .TakeWhile((v)=>v.left<1000);
      
      
              z.Subscribe(x => { Console.WriteLine( x.left +" "+x.right ); });
          }
      

      如果不是,那么不清楚您的问题是什么。如果您希望能够不断重复第二个可观察的历史内容,从某种意义上说,您的“a”、“b”可能稍后会变成“a”、“b”、“c”,那么这个 3 字母序列应该进行迭代,那么您的源可能是聚合历史窗口的 .Scan 的结果。

      【讨论】:

      • 感谢您的回答,您的假设符合我的要求。然而,正如我在原始帖子中所写,当使用 Subject 而不是 cold Observable 时,Range 是,Repeat 运算符什么也不做。如果您可以通过查看我发布的更好地描述该案例的答案来提供使用我原始帖子的两个流的解决方案,我将不胜感激。
      • 所以,澄清一下,您希望您的可观察字符串是热的,并且您希望“迄今为止的历史”不断重复(与数字流一起压缩)?因此,例如,如果您的字符串 observable 有“a”,“b”的历史,它应该用数字压缩它,直到下一个“c”到达,从那时起它应该用 {“a”,“b” 压缩,"c"}. 重复?我了解您的问题吗?
      • 正好另外数字也很火
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-03-31
      • 2021-01-19
      相关资源
      最近更新 更多