【问题标题】:Throttling Events and Locking Methods限制事件和锁定方法
【发布时间】:2014-02-18 16:47:14
【问题描述】:

假设我有这样的东西:

<TextBox Text="{Binding Text, Mode=TwoWay}" />

还有这样的:

public class MyViewModel : INotifyPropertyChanged
{
    public MyViewModel()
    {
        // run DoWork() when this.Text changes
        Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
            .Where(x => x.EventArgs.PropertyName.Equals("Text"))
            .Subscribe(async x => await DoWork());
    }

    private async Task DoWork()
    {
        await Task.Delay(this.Text.Length * 100);
    }

    public event PropertyChangedEventHandler PropertyChanged;

    private string _Text = "Hello World";
    public string Text
    {
        get { return _Text; }
        set
        {
            _Text = value;
            if (this.PropertyChanged != null)
                this.PropertyChanged(this, new PropertyChangedEventArgs("Text"));
        }
    }
}

在这种情况下,用户可能会非常快速地打字。我需要:

  1. DoWork() 不得在 DoWork() 已运行时运行

  2. 用户可以输入sspurs, some changes, pause, some changes

  3. DoWork() 不需要每次更改,只有最后一次更改

  4. DoWork() 的调用频率无需超过 1 秒

  5. DoWork() 不能等到最后一次更改,如果冲刺时间大于 1 秒

  6. DoWork() 不应在系统空闲时调用

  7. DoWork() 的持续时间因 this.Text 的长度而异

问题不在于 Rx 是否可以做到这一点。我知道可以。正确的语法是什么?

【问题讨论】:

  • 怪异的,我现在正在做这个确切的事情并且想知道确切的事情
  • 这不是纯 RX 解决方案的要求。
  • 想出一个更好的标题哈哈
  • @PaulBetts,好点子。
  • 所提出的解决方案都不是简单的。我认为,鉴于 SO 上最有经验的 Rx 人员显然必须努力工作才能获得可行的解决方案,这掩盖了这显然很容易的事实?我认为这里的任何代码都不容易阅读,更不用说编写了。这就是为什么我仍然倾向于在流之外做这件事。根据我的经验,在 monad 之外管理慢速消费者更加容易。

标签: winrt-xaml system.reactive


【解决方案1】:

虽然我有点同意 James World 的观点,但我认为如果我们只使用一点可变状态,你可以做得更好。如果 DoWork 看起来像这样会怎样:

AsyncSubject<Unit> doingWork;
public IObservable<Unit> DoWork()
{
    if (doingWork != null) return doingWork;

    doingWork = Observable.Start(() => {
        // XXX: Do work
        Thread.Sleep(1000);

        // We only kick off this 1sec timeout *after* we finish doing work
        Observable.Timer(TimeSpan.FromSeconds(1.0), DispatcherScheduler.Instance)
            .Subscribe(_ => doingWork = null);
    });

    return doingWork;
}

现在,DoWork 会自动消除抖动,并且我们可以摆脱这种等待订阅的愚蠢行为;我们将油门设置为 250 毫秒,以便快速但不太快。

这最初似乎违反了上面的第 5 条要求,但我们确保任何人调用 DoWork 过快只会得到上一次运行的结果 - 结果是 DoWork 将被多次调用,但不一定 do 任何东西。但是,这确保了如果我们不工作,在用户停止输入后我们不会有 1 秒的延迟,就像 Throttle(1.seconds)

    Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
        .Where(x => x.EventArgs.PropertyName.Equals("Text"))
        .Throttle(TimeSpan.FromMilliseconds(250), DispatcherScheduler.Instance)
        .SelectMany(_ => DoWork())
        .Catch<Unit, Exception>(ex => {
            Console.WriteLine("Oh Crap, DoWork failed: {0}", ex);
            return Observable.Empty<Unit>();
        })
        .Subscribe(_ => Console.WriteLine("Did work"));

【讨论】:

  • 这是一个不错的技巧 - 虽然总的来说我认为我更喜欢订阅者中的“混乱”。不过,它仍在我的剧本中:)。顺便说一句 - 我认为你想要 Sample 而不是 Throttle (参见 OP 中的第 5 点)。
  • 我实际上确实想要 Throttle - 诀窍是,SelectMany 将每 1 秒间隔触发 DoWork > 1x,但这些订阅中的 一些 不会做任何事情,因为它们'只会得到之前DoWork的AsyncSubject。
  • 其实这里有一些东西需要修复。 ThrottleSample 都不起作用。如果您使用Throttle,以小于 250 毫秒间隔的稳定事件流将持续抑制事件,不是吗?另外,doingWork 不应该被声明为IObservable&lt;Unit&gt; 吗? (为了确保我没有发疯,我确实运行了连接到按钮单击的代码,然后像疯了一样按下了该按钮,但没有出现任何事件......)
【解决方案2】:

我认为解决您的问题的更简单且可重用的方法实际上可能是基于异步/等待而不是基于 RX。查看我得到的单线程EventThrottler 类实现作为对'Is there such a synchronization tool as “single-item-sized async task buffer”?' question 的回答。有了它,您可以简单地重写您的 DoWork() 方法:

private void DoWork()
{
    EventThrottler.Default.Run(async () =>
    {
        await Task.Delay(1000);
        //do other stuff
    });
}

并在每次文本更改时调用它。不需要 RX。此外,如果您已经在使用 WinRT XAML 工具包 - 类是 in there

这里是节流器类代码的副本作为快速参考:

public class EventThrottler
{
    private Func<Task> next = null;
    private bool isRunning = false;

    public async void Run(Func<Task> action)
    {
        if (isRunning)
            next = action;
        else
        {
            isRunning = true;

            try
            {
                await action();

                while (next != null)
                {
                    var nextCopy = next;
                    next = null;
                    await nextCopy();
                }
            }
            finally
            {
                isRunning = false;
            }
        }
    }

    private static Lazy<EventThrottler> defaultInstance =
        new Lazy<EventThrottler>(() => new EventThrottler());
    public static EventThrottler Default
    {
        get { return defaultInstance.Value; }
    }
}

【讨论】:

  • 这是一个非常简洁的解决方案。从远处观察时,方法再次相似,但我认为从处理背压的角度来看最“有意义”。
【解决方案3】:

您可能会惊讶于纯 RX 解决方案的难度。它与提交限制搜索以响应文本框更改的类似(和典型的 Rx 101 示例)略有不同 - 在这种情况下,可以触发并发搜索,取消除最新搜索之外的所有搜索。

在这种情况下,一旦 DoWork() 关闭并运行,它就不能被替换或中断。

问题在于 Rx 流在一个方向流动并且不能“反向对话”——因此事件排队等待慢速消费者。在 Rx 中,由于消费者缓慢而丢弃事件是相当困难的。

DoWork() 可以在新的(可能受到限制的)事件到达时被取消和替换,这要容易得多。

首先我提出一个纯 Rx 解决方案。最后,一个更简单的方法是通过 Rx 之外的调度机制来处理慢消费者。

对于纯粹的方法,您将需要这个辅助扩展方法来丢弃针对慢速消费者 which you can read about here 排队的事件:

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;

        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool wasNotAlreadyActive;
            lock (gate)
            {
                wasNotAlreadyActive = !active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (wasNotAlreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

有了这个,您可以执行以下操作:

// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
          .Where(x => x.EventArgs.PropertyName.Equals("Text"))
          .Sample(TimeSpan.FromSeconds(1)) // get the latest event in each second
          .ObservableLatestOn(Scheduler.Default) // drop all but the latest event
          .Subscribe(x => DoWork().Wait()); // block to avoid overlap

备注

说实话,您最好避免在这里使用纯 Rx 解决方案,而不要直接从订阅者那里致电 DoWork()。我将使用从 Subscribe 方法调用的中间调度机制来包装它,如果它已经在运行,则处理不调用它 - 代码将更易于维护。

编辑:

在考虑了几天之后,我并没有比这里的其他一些答案做得更好 - 我将把上面的内容留给大家感兴趣,但我认为我最喜欢 Filip Skakun 的方法。

【讨论】:

  • 在几行纯RX中为此编写扩展实际上非常容易。不需要布尔门或线程锁定。请参阅我的答案以获得一些组合器的帮助。
  • 不确定您指的是哪条评论。您建议由于消费者缓慢而在 RX 中删除事件是很难做到的。这不是真的。如果这就是你想要做的,那么这很容易。您只需要创建一个 Replay(1) 缓冲区并使用由消费者完成触发的采样器来使用它。
  • 您使用_running 标志的建议也不起作用。它将错过最后一个事件。
  • 评论以“以上都不是...”开头。如果你认为你刚才说的对普通人来说似乎很容易,那么你就已经失去了接触 Rx 的现实!!!
  • 实际上,如果您从足够远的地方看,所提出的每个解决方案实际上都具有相同的想法,即必须有状态地跟踪正在运行的任务。由于任何组合器都只是订阅上游,我相信我最终会将它重新打包成类似的东西。现在必须继续日常工作,但我很快会重新审视这一点。不过,您的解决方案正在我身上成长!
【解决方案4】:

这就是我所拥有的(代码已经过测试,顺便说一句)。它基于事件限制扩展I created a few years ago。我认为它的一个好名字是Ouroboros。 最重要的是,与使用油门时相反,如果冷却时间过去了,它会立即开始工作。

public static IObservable<TResult> CombineVeryLatest<TLeft, TRight, TResult>(
    this IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}

public static IObservable<TWork> WorkSequencer<T, TWork>(
    this IObservable<T> source, Func<Task<TWork>> work)
{
    return source.Publish(src =>
    {
        var fire = new Subject<T>();
        var fireCompleted = fire.SelectMany(x => work()).Publish();
        fireCompleted.Connect();
        var whenCanFire = fireCompleted.StartWith(default(TWork));

        var subscription = src
            .CombineVeryLatest(whenCanFire, (x, flag) => x)
            .Subscribe(fire);

        return fireCompleted.Finally(subscription.Dispose);
    });
}

那么用法是:

    private int _counter;

    public MainWindow()
    {
        InitializeComponent();
        var clicks = Observable
            .FromEventPattern(TestBn, "Click")
            .Do(_ =>
            {
                Console.WriteLine("click");
                _counter++;
            });
        clicks.WorkSequencer(DoWork).Subscribe();
    }

    private async Task<int> DoWork()
    {
        var workNumber = _counter;
        Console.WriteLine("Work Start " + workNumber);
        await Task.WhenAll(Task.Delay(_counter*100), Task.Delay(1000));
        Console.WriteLine("Work Done " + workNumber);
        return _counter;
    }

【讨论】:

    【解决方案5】:

    为此,我在 UI 中使用了几个名为 SubscribeWithoutOverlap 的组合器。所有传入的事件都被丢弃,除了最后一个被丢弃,直到工作完成。当工作完成时,事件缓冲区被要求用于下一个事件。

        /// <summary>
        /// Subscribe to the observable whilst discarding all events that are
        /// recieved whilst the action is being processed. Can be
        /// used to improve resposiveness of UI's for example 
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="source"></param>
        /// <param name="action"></param>
        /// <param name="scheduler"></param>
        /// <returns></returns>
        public static IDisposable SubscribeWithoutOverlap<T>
        (this IObservable<T> source, Action<T> action, IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Replay(1);
    
            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });
    
            var connection = p.Connect();
            sampler.OnNext(Unit.Default);
    
            return new CompositeDisposable(connection, subscription);
        }
    

        public static IDisposable SubscribeWithoutOverlap<T>
        (this IObservable<T> source, Func<T,Task> action, IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Replay(1);
    
            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(async l =>
                {
                    await action(l);
                    sampler.OnNext(Unit.Default);
                });
    
            var connection = p.Connect();
            sampler.OnNext(Unit.Default);
    
            return new CompositeDisposable(connection, subscription);
        }
    

    所以以下应该满足您的要求。

    IObservable<string> source;
    
    source
       .Throttle(TimeSpan.FromMilliSeconds(100))
       .Merge(source.Sample(TimeSpan.FromSeconds(1))
       .SubscribeWithoutOverlap(DoWork)
    

    注意混合使用 Throttle 和 Sample 以获得问题中要求的两种计时行为。

    关于其他一些答案。如果您发现自己将复杂的 RX 逻辑放入业务逻辑中,请提取到具有明确目的的自定义组合器中。当你试图理解它的作用时,你会感谢自己。

    【讨论】:

    • 这与我的回答类似。我可能弄错了,但是如果在上一个操作仍在执行期间,最后一个操作请求可能会“卡住”。在我的解决方案中,最后的工作是在冷却之后保证的。
    • 这很有趣。我应该缓冲最后一个事件,而不是丢弃所有事件。我会看看并更新。
    • 我认为这只是将var p = source.Publish() 更改为var p = source.Replay(1) 的问题。来自源的最后一个事件现在被缓冲了。你认为这能解决问题吗?
    • 另外一个来自我,尽管我仍然不相信这是最好的(参见 OP cmets)——这是一种非常有趣和有用的方法。
    • 我想你最终会从 RX 团队看到类似的东西。如果你浏览 googleverse,你会发现backpressure 的概念在四处飘荡。这不是一个已解决的问题,所以它还没有在库中,但我希望它会在某个时候出现。
    猜你喜欢
    • 2012-07-08
    • 2010-10-16
    • 1970-01-01
    • 2020-01-12
    • 1970-01-01
    • 1970-01-01
    • 2017-11-23
    • 2013-12-03
    • 1970-01-01
    相关资源
    最近更新 更多