【问题标题】:Reactive.NET Conditional ThrottleReactive.NET 条件限制
【发布时间】:2013-05-01 01:56:17
【问题描述】:

有没有办法有条件限制。

我有一个类,它有一个带有 2 个参数(控制发送者、字符串文本)的事件。我想在我的 Rx 代码中使用这个事件,并使用限制。唯一的问题是我想限制文本,只针对同一个发件人。

因此,如果 sender = textbox1,则节流 300 秒,但是如果发送者更改,则忽略节流并将事件向上发送到链中。

sender=textbox1, text = 'm' within 300 seconds (ignore)
sender textbox1, text = 'mu' within 300 seconds (ignore)
sender textbox1, text = 'muk' more than 300 seconds (process)

sender=textbox1, text = 'm' within 300 seconds(ignore)
sender=textbox2, text = 'y' within 300 seconds(process) //as the sender has changed now.

【问题讨论】:

  • 你想向上发送哪个事件?似乎您想在链上发送“textbox1”、“m”,因为现在您有来自 textbox2 的事件。但是你不想将 textbox2 发送到链上,除非 300 秒过去,否则你不会限制很多 textbox2 事件?是这个意思吗?
  • 为什么不单独限制和缓冲它们,然后在结果进入时合并?

标签: c# system.reactive


【解决方案1】:

以下代码将确保在显示值之前已经过去了 300 毫秒,除非发送者已更改。

var source = new Subject<Pair>();

// The Publish().RefCount() and Subscribe() are to make the sequence hot
var changedSender = source.DistinctUntilChanged(p => p.Sender).Publish().RefCount();
changedSender.Subscribe();

var throttled = source.Select(p =>
    Observable.Amb(changedSender, source.Skip(TimeSpan.FromMilliseconds(300))).Take(1))
    .Concat();

throttled.Subscribe(WritePair);

source.OnNext(new Pair("A", "i"));
System.Threading.Thread.Sleep(10);
source.OnNext(new Pair("A", "it"));
System.Threading.Thread.Sleep(10);
source.OnNext(new Pair("A", "bit"));
System.Threading.Thread.Sleep(500);
source.OnNext(new Pair("A", "bite"));
source.OnNext(new Pair("B", "a"));
System.Threading.Thread.Sleep(10);
source.OnNext(new Pair("A", "bitey"));
System.Threading.Thread.Sleep(500);
source.OnNext(new Pair("A", "at"));

这会产生以下输出:

A: bite
B: a
A: bitey
A: at

【讨论】:

  • 感谢您的回答。虽然我意识到我想要一些与输出不同的东西。我想要的是在 B:a 之前显示 A:咬。我想要更改之前的先前值。我对您的代码进行了一些更改以使其正常工作。
【解决方案2】:

这就是我为完成这项工作所做的工作

        var source = new Subject<Pair>();


        // The Publish().RefCount() and Subscribe() are to make the sequence hot
        //var changedSender = source.DistinctUntilChanged(p => p.Sender).Publish().RefCount();
        var changedSender = source.Zip(source.Skip(1), (previous, next) => new { Previous = previous, Next = next }).DistinctUntilChanged(arg => arg.Next.Sender).Select(o => o.Previous).Publish().RefCount();
        changedSender.Subscribe();
        var either = changedSender.Merge(source.Throttle(TimeSpan.FromMilliseconds(300))); //.Do(o => Console.WriteLine("Do {0}", o.ToString()));
        either.Subscribe(o => o.Print());


        source.OnNext(new Pair("A", "i"));
        System.Threading.Thread.Sleep(10);
        source.OnNext(new Pair("A", "it"));
        System.Threading.Thread.Sleep(10);
        source.OnNext(new Pair("A", "bit"));
        System.Threading.Thread.Sleep(500);
        source.OnNext(new Pair("A", "bite"));
        source.OnNext(new Pair("A", "bite1"));
        source.OnNext(new Pair("A", "bite2"));
        source.OnNext(new Pair("B", "a"));
        source.OnNext(new Pair("B", "ani"));
        source.OnNext(new Pair("B", "animal"));
        System.Threading.Thread.Sleep(10);
        source.OnNext(new Pair("A", "bitey"));
        System.Threading.Thread.Sleep(500);
        source.OnNext(new Pair("A", "at"));
        source.OnNext(new Pair("B", "empty"));
        source.OnNext(new Pair("A", "empty"));
        source.OnNext(new Pair("C", "new"));

        Console.ReadLine();

【讨论】:

    猜你喜欢
    • 2011-05-15
    • 2010-11-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-07-03
    • 2011-09-05
    • 2011-01-10
    • 1970-01-01
    相关资源
    最近更新 更多