【问题标题】:How to add elements to an observable stream by Observable.Timer based on stream values?如何通过 Observable.Timer 基于流值向可观察流添加元素?
【发布时间】:2014-06-05 14:39:54
【问题描述】:

我希望标题足够清楚。我正在玩弄 Rx 的想法,遇到了一个我不知道如何解决的问题。

我有一个原始形式的事件输入,例如 { 1, 2, 3, 4, 5, 1, 2, 3, 4 } 被转换为某种新形式的输入,例如 IType。假设每当观察到数字 2 时,计时器就会开始计时并将新的转换值(或者说,数字 6)插入到流中,除非遇到数字 5,在这种情况下,计时器会被取消。

我正在考虑构建一个从各种来源获取输入的状态机。如果我也可以将“定时”事件作为流的一部分,那就太好了。

代码如下,但基本上我目前坚持使用Observable.Generate 的一些想法(即让它看起来像Observable.Timer,但要向输入序列返回/插入一个新事件)和/或也许像Observable.TakeUntilObservable.Create 和合并流之类的东西,但我不确定是否有更好的方法,甚至不确定如何在代码方面实现这一目标。

就代码而言,它可能是这样的

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;


namespace RxTesting
{

    public interface IType
    {
        int OriginalInput { get; set; }
    }


    public class SomeEvent: IType
    {
        public int OriginalInput { get; set; }
    }


    public class TimeEvent: IType
    {
        public int OriginalInput { get; set; }
    }

     class  Program
     {
         static  void  Main(string[] args)
         {
             //Here we should have one TimerEvent after the last "2" in the sequence.
             var input = new Subject<int>();
             input.Select(i => new SomeEvent { OriginalInput = i }).SomeTimerOperatorHere().Subscribe(Console.WriteLine);

             input.OnNext(1);
             input.OnNext(2);
             input.OnNext(3);
             input.OnNext(4);
             input.OnNext(5);
             input.OnNext(1);
             input.OnNext(2);
             input.OnNext(3);
             input.OnNext(4);

             input.Dispose();
             Console.ReadKey();
        }
    }
}

在控制台这里应该打印一个RxTesting.TimerEvent,因为有一个2,之后没有数字5取消它的效果。

【问题讨论】:

    标签: c# system.reactive reactive-programming


    【解决方案1】:

    应该这样做 - 我认为您的 TakeUntil 想法是正确的。

    我们正在过滤事件以仅包含 OriginalInput 为 2 的事件,并将每个事件投射到一个值为 6 的单值延迟流,如果 OriginalInput 为 5 的事件到达,该流将被切断。生成的流被SelectMany 展平。

    public static IObservable<IType> SomeTimerOperatorHere(
      this IObservable<SomeEvent> source)
    {
      var delay = TimeSpan.FromSeconds(1);
    
      return Observable.Create<IType>(o => {
    
        var sourcePub = source.Publish().RefCount();
    
        return sourcePub
          .Where(x => x.OriginalInput == 2)
          .SelectMany(type => 
            Observable.Return(new TimeEvent{ OriginalInput = 6 })
                      .Delay(delay)
                      .TakeUntil(sourcePub.Where(x => x.OriginalInput == 5))
          ).Subscribe(o);
      });
    }
    

    【讨论】:

    • 抱歉耽搁了,因为我还没在电脑上(将在接下来的 12 小时左右,但可能会到明天)。 Delay 的想法是一个简洁的想法,因为我收集到阅读它(没有测试没有编译器),但是将生成的 TimeEvents 与原始流结合起来的同样好的方法是什么?嗯,也许我有这个想法。谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多