【问题标题】:Using Rx to throttle callbacks from non-asynchronous calls使用 Rx 限制来自非异步调用的回调
【发布时间】:2011-05-10 16:36:58
【问题描述】:

我的问题与this 问题类似,但与往常一样略有不同。

我目前从事一个项目,该项目使用一种简单的模式来异步接收底层数据更改的通知。

类 Foo 负责订阅这些数据更改,并通过注册实现给定接口的类的实例,为类提供一种方法来注册它们对这些更改的兴趣:

public Guid SubscribeToChanges(ISomeCallbackInterface callback)

Class Bar 实现了这个回调并注册了自己:

public class Bar : ISomeCallbackInterface
{
    ...
    //initialise instance of Foo class and subscribe to updates
    _foo.SubscribeToChanges(this);
    ...

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        ...
    }
}

理想情况下,我们希望限制这些回调,例如,我们可以获得一个回调,将特定数据的值从 x 更改为 y,然后在一秒钟内返回 x。查看 Rx 文档,使用 DistinctUntilChanged 运算符将是微不足道的。

但是,问题是如何创建一个 IObservable 集合,然后我可以将运算符应用到上面给出的回调实现。文档非常清楚如何从标准 .Net 事件或 Begin.../End... 方法对创建 IObservable。

更新:正如 Richard Szalay 在他的评论中指出的那样,我需要结合使用 DistinctUntilChanged 和 Throttle 来实现我的需要。

再次感谢 Richard,我还应该提到我需要能够取消订阅回调。在当前模型中,我只是在 Foo 的实例上调用 Unscubscribe(Guid subscriptionToken)。

【问题讨论】:

  • 如果一个值从 x 变为 y 再变为 x,DistinctUntilChanged 将发出所有三个值。
  • @Richard Szalay - 好点。我忘了提到我需要将 DistinctUntilChanged 与 Throttle 运算符一起使用。将更新我的问题。

标签: c# c#-4.0 system.reactive


【解决方案1】:

我想到的唯一方法是使用ISomeCallbackInterfaceObservable.Create 的自定义实现。尽管如此,它还是觉得解决方案是硬生生的:

public static IObservable<Tuple<string,IEnumerable<Tuple<string, int, object>>> 
    FromCustomCallbackPattern(ISomeCallbackPublisher publisher)
{
    return Observable.CreateWithDisposable<T>(observer =>
    {
        var subject = new Subject<
            Tuple<string,IEnumerable<Tuple<string, int, object>>>();

        var callback = new ObservableSomeCallback(subject);

        Guid subscription = publisher.SubscribeToChanges(callback);

        return new CompositeDisposable(
            subject.Subscribe(observer),
            Disposable.Create(() => publisher.UnsubscribeFromChanges(subscription))
        );
    });
}

private class ObservableSomeCallback : ISomeCallbackInterface
{
    private IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer;

    public ObservableSomeCallback(
        IObserver<Tuple<string,IEnumerable<Tuple<string, int, object>>> observer)
    {
        this.observer = observer;
    }

    public void CallbackMethod(string id, IEnumerable<Tuple<string, int, object>> data)
    {
        this.observer.OnNext(new Tuple<
            string,IEnumerable<Tuple<string, int, object>>(id, data));
    }
}

【讨论】:

  • 我会用一个主题,你越来越喜欢了 :)
  • 非常感谢。我会试试这个建议。也感谢您了解我错过了我也需要取消订阅回调的事实 - 我忘了包括这一点!
猜你喜欢
  • 2017-12-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多