【问题标题】:How can I trigger buffering of an Observable data stream when a certain value or condition is met?当满足某个值或条件时,如何触发 Observable 数据流的缓冲?
【发布时间】:2012-09-27 08:30:30
【问题描述】:

我正在使用绑定到 COM 端口的 Reactive Extensions Observable 数据流,并且我正在显示来自该数据流的缓冲区,该数据流在一段时间内被占用。

这是我的基本 Rx 代码,其中以 25 毫秒的块返回字节数据。我想在第一次达到特定阈值时触发缓冲区的生成,然后仅在收集前一个缓冲区后再次执行。

var o = serialData.Buffer(TimeSpan.FromMilliseconds(25))
                  .ObserveOn(SynchronizationContext.Current);
var mySerialObserver = o.Subscribe<IList<byte>>(SubscribeAction());

serialData 对象是来自 USB COM 端口的连续字节值流的 IObservable。代码改编自 Bart De Smet 帖子:

How to implement SerialPort parser with Rx

使用 Rx Buffer(TimeSpan) 方法,我可以对 serialData 进行采样并在图表上显示缓冲区值(在我的 SubscribeAction 方法中使用 DynamicDataDisplay)。

我想将功能扩展为类似于示波器触发器,这可能涉及在 serialData 值超过给定阈值时调用 Rx Buffer 方法,但不收集重叠缓冲区(这类似于示波器时基在一定的输入电压下触发,但直到扫描完成才再次触发)

请有人给我一些关于如何实施的想法?

【问题讨论】:

  • 我认为您需要多解释一下您的问题/要求。 “临界点”? “集”?等等……
  • 我有一个来自 COM 端口的 IObservable 流 - 我从 Bart de Smet 的代码示例中获得的 serialData
  • 对不起,谜团,我在主要帖子中提供了更多细节

标签: .net c#-4.0 system.reactive


【解决方案1】:

缓冲区只会在缓冲区关闭之前释放所有值,这对于实时图表不是很有用。 您必须将值拆分为不重叠的窗口 - 从给定的触发器开始,并在扫描条件完成时关闭 - 一个完整的扫描周期的窗口。 不幸的是,窗口在启动时仍会为我们提供值,因此我们将不得不跳过触发器触发之前传入的所有值。

    static IObservable<IObservable<T>> TriggeredSweep<T>(
        this IObservable<T> source,
        Func<T, bool> triggerCondition,
        Func<T, bool> sweepEnd
        )
    {
        source = source.Publish().RefCount();
        return source.Window(() => source.Where(triggerCondition).Sample(source.Where(sweepEnd)))
                     .Select(s => s.SkipWhile(v => !triggerCondition(v)));

    }

测试这一点的最佳方法是使用它所依据的示波器模型:

        double period = 1000 / 0.5; //0.5 Hz
        int cycles = 4;             //cycles to display
        int quantization = 100;     //cycles to display            
        int amplitude = 10;         //signal peak            

        int range = quantization * cycles;    //full range 

        //Sine wave generator for n cycles
        //makes tuple of (t, sin(t))
        var source = Observable.Interval(TimeSpan.FromMilliseconds(period / range))
                               .Select(s => s % (range + 1))
                               .Select(s => Tuple.Create(s, amplitude * Math.Sin((double)s / ((double)range / (double)cycles) * 2 * Math.PI)));


        source.TriggeredSweep(
            value => value.Item2 > 5, //Trigger when Signal value > 5
            value => value.Item1 / quantization >= cycles //end sweep when all cycles are done
            )
              .Subscribe(window =>
              {
                  Console.Clear(); //Clear CRO Monitor

                  window.Subscribe(value =>
                  {
                      //Set (x, y)
                      Console.CursorLeft = (int)((double)value.Item1 / range * (Console.WindowWidth - 1));
                      Console.CursorTop = (int)((amplitude - value.Item2) / (2 * amplitude) * (Console.WindowHeight - 1));

                      //draw
                      Console.Write("x");
                  });
              });

        //prevent close
        Console.ReadLine();

输出:

    xxx                   xxxx                   xxx                   xxxx
   xx  x                  x  x                  xx  x                  x  x
   x   x                 xx   x                 x   x                 xx   x
  xx    x                x    x                xx    x                x    x
  x     x               xx     x               x     x               xx     x
  x      x              x      x               x      x              x      x
  x      x              x      x              xx      x              x      x
         x              x       x             x       x              x       x
         x             x        x             x       x             x        x
          x            x        x             x        x            x        x
          x            x        xx           x         x            x        xx
          x           x          x           x         x           x          x
           x          x          x           x          x          x          x
           x          x          x           x          x          x          x           x
           x          x          x          x           x          x          x          x
           x          x           x         x           x          x           x         x
           xx        x            x         x           xx        x            x         x
            x        x            x        x             x        x            x        x
            x        x             x       x             x        x             x       x
            x       x              x       x             x       x              x       x
             x      x              x      xx              x      x              x      xx
             x      x               x     x               x      x               x     x
             x     xx               x     x               x     xx               x     x
              x    x                x    xx                x    x                x    xx
              x   xx                 x   x                 x   xx                 x   x
               x  x                  x  xx                  x  x                  x  xx
               xxxx                   xxx                   xxxx                   xxx
                x                      x                     x                      x

我希望这段代码对使用 Rx 测试简单的信号处理功能有用。 :)

【讨论】:

  • 感谢您的解决方案,我花了一些时间来了解反应式扩展,您的示例确实很有帮助。我正在做一个简化的假设,即在一个 Buffer 中收集的所有数据在时间间隔内均匀分布。
  • @CliveG 没问题。顺便说一句,如果您同意,可以将其标记为答案。
猜你喜欢
  • 2020-01-11
  • 1970-01-01
  • 1970-01-01
  • 2020-07-16
  • 1970-01-01
  • 2021-08-21
  • 1970-01-01
  • 2021-10-13
  • 1970-01-01
相关资源
最近更新 更多