【问题标题】:Implementing Observers and Subjects using IObserver/IObservable使用 IObserver/IObservable 实现观察者和主题
【发布时间】:2011-09-26 00:50:53
【问题描述】:

我想创建一个可以用来表示动态计算值的类,而另一个表示值的类可以作为这些动态计算值的源(主题)。目标是当主题发生变化时,计算值会自动更新。

在我看来,使用 IObservable/IObserver 是可行的方法。不幸的是我不能使用响应式扩展库,所以我不得不从头开始实现主题/观察者模式。

废话不多说,这是我的课:

public class Notifier<T> : IObservable<T>
{
    public Notifier();
    public IDisposable Subscribe(IObserver<T> observer);
    public void Subscribe(Action<T> action);
    public void Notify(T subject);
    public void EndTransmission();
}

public class Observer<T> : IObserver<T>, IDisposable
{
    public Observer(Action<T> action);
    public void Subscribe(Notifier<T> tracker);
    public void Unsubscribe();
    public void OnCompleted();
    public void OnError(Exception error);
    public void OnNext(T value);
    public void Dispose();
}

public class ObservableValue<T> : Notifier<T>
{
    public T Get();
    public void Set(T x);
}

public class ComputedValue<T>
{
    public T Get();
    public void Set(T x);
}

我的实现主要来自:http://msdn.microsoft.com/en-us/library/dd990377.aspx

那么“正确”的方法是什么?注意:我不关心 LINQ 或多线程甚至性能。我只是希望它简单易懂。

【问题讨论】:

  • 如果你真的不关心 LINQ、线程和其他类型的问题,那么为什么不使用 ref 变量,这可能是最简单的方法。

标签: c# system.reactive subject-observer


【解决方案1】:

如果我是你,我会尝试尽可能接近 Rx 的实现方式来实现你的类。

其中一个关键的基本原则是使用相对较少的具体类,这些类通过大量操作组合在一起。因此,您应该创建一些基本的构建块并使用组合将它们组合在一起。

我将在 Reflector.NET 下初步了解两个类:AnonymousObservable&lt;T&gt;AnonymousObserver&lt;T&gt;。特别是 AnonymousObservable&lt;T&gt; 被整个 Rx 用作实例化 observables 的基础。实际上,如果您查看派生自 IObservable&lt;T&gt; 的对象,就会发现有一些专门的实现,但只有 AnonymousObservable&lt;T&gt; 用于通用用途。

静态方法Observable.Create&lt;T&gt;() 本质上是AnonymousObservable&lt;T&gt; 的包装器。

另一个明显适合您要求的 Rx 类是 BehaviorSubject&lt;T&gt;。主题既是可观察者又是观察者,BehaviorSubject 适合您的情况,因为它会记住收到的最后一个值。

鉴于这些基本类,您几乎拥有创建特定对象所需的所有位。您的对象不应继承上述代码,而应使用组合来组合您需要的行为。

现在,我建议对您的类设计进行一些更改,以使它们与 Rx 更兼容,从而更加可组合和健壮。

我会放弃你的 Notifier&lt;T&gt; 课程,转而使用 BehaviourSubject&lt;T&gt;

我会放弃你的 Observer&lt;T&gt; 课程,转而使用 AnonymousObserver&lt;T&gt;

然后我将ObservableValue&lt;T&gt; 修改为如下所示:

public class ObservableValue<T> : IObservable<T>, IDisposable
{
    public ObservableValue(T initial) { ... }
    public T Value { get; set; }
    public IDisposable Subscribe(IObserver<T> observer);
    public void Dispose();
}

ObservableValue&lt;T&gt; 的实现将包装BehaviourSubject&lt;T&gt; 而不是从它继承,因为暴露IObserver&lt;T&gt; 成员将允许访问OnCompletedOnError 这不会有太大意义,因为这个类代表一个价值而不是计算。订阅将使用 AnonymousObservable&lt;T&gt;Dispose 将清理包装的 BehaviourSubject&lt;T&gt;

然后我会将ComputedValue&lt;T&gt; 修改为如下所示:

public class ComputedValue<T> : IObservable<T>, IDisposable
{
    public ComputedValue(IObservable<T> source) { ... }
    public T Value { get; }
    public IDisposable Subscribe(IObserver<T> observer);
    public void Dispose();
}

ComputedValue&lt;T&gt; 类将为所有订阅者包装AnonymousObservable&lt;T&gt;,并使用source 获取Value 属性值的本地副本。 Dispose 方法将用于取消订阅 source observable。

这最后两个类是您的设计似乎需要的唯一真正具体的实现——这仅仅是因为Value 属性。

接下来,您的扩展方法需要一个静态 ObservableValues 类:

public static class ObservableValues
{
    public static ObservableValue<T> Create<T>(T initial)
    { ... }

    public static ComputedValue<V> Compute<T, U, V>(
        this IObservable<T> left,
        IObservable<U> right,
        Func<T, U, V> computation)
    { ... }
}

Compute 方法将使用AnonymousObservable&lt;V&gt; 执行计算并生成一个IObservable&lt;V&gt; 以传递给该方法返回的ComputedValue&lt;V&gt; 的构造函数。

一切就绪后,您现在可以编写以下代码:

var ov1 = ObservableValues.Create(1);
var ov2 = ObservableValues.Create(2);
var ov3 = ObservableValues.Create(3);

var cv1 = ov1.Compute(ov2, (x, y) => x + y);
var cv2 = ov3.Compute(cv1, (x, y) => x * y);

//cv2.Value == 9

ov1.Value = 2;
ov2.Value = 3;
ov3.Value = 4;

//cv2.Value == 20

如果这有帮助和/或有什么我可以详细说明的,请告诉我。


编辑:还需要一些一次性用品。

您还需要实现AnonymousDisposableCompositeDisposable 来管理您的订阅,尤其是在Compute 扩展方法中。看看使用 Reflector.NET 的 Rx 实现,或者使用下面我的版本。

public sealed class AnonymousDisposable : IDisposable
{
    private readonly Action _action;
    private int _disposed;

    public AnonymousDisposable(Action action)
    {
        _action = action;
    }

    public void Dispose()
    {
        if (Interlocked.Exchange(ref _disposed, 1) == 0)
        {
            _action();
        }
    }
}

public sealed class CompositeDisposable : IEnumerable<IDisposable>, IDisposable
{
    private readonly List<IDisposable> _disposables;
    private bool _disposed;

    public CompositeDisposable()
        : this(new IDisposable[] { })
    { }

    public CompositeDisposable(IEnumerable<IDisposable> disposables)
    {
        if (disposables == null) { throw new ArgumentNullException("disposables"); }
        this._disposables = new List<IDisposable>(disposables);
    }

    public CompositeDisposable(params IDisposable[] disposables)
    {
        if (disposables == null) { throw new ArgumentNullException("disposables"); }
        this._disposables = new List<IDisposable>(disposables);
    }

    public void Add(IDisposable disposable)
    {
        if (disposable == null) { throw new ArgumentNullException("disposable"); }
        lock (_disposables)
        {
            if (_disposed)
            {
                disposable.Dispose();
            }
            else
            {
                _disposables.Add(disposable);
            }
        }
    }

    public IDisposable Add(Action action)
    {
        if (action == null) { throw new ArgumentNullException("action"); }
        var disposable = new AnonymousDisposable(action);
        this.Add(disposable);
        return disposable;
    }

    public IDisposable Add<TDelegate>(Action<TDelegate> add, Action<TDelegate> remove, TDelegate handler)
    {
        if (add == null) { throw new ArgumentNullException("add"); }
        if (remove == null) { throw new ArgumentNullException("remove"); }
        if (handler == null) { throw new ArgumentNullException("handler"); }
        add(handler);
        return this.Add(() => remove(handler));
    }

    public void Clear()
    {
        lock (_disposables)
        {
            var disposables = _disposables.ToArray();
            _disposables.Clear();
            Array.ForEach(disposables, d => d.Dispose());
        }
    }

    public void Dispose()
    {
        lock (_disposables)
        {
            if (!_disposed)
            {
                this.Clear();
            }
            _disposed = true;
        }
    }

    public IEnumerator<IDisposable> GetEnumerator()
    {
        lock (_disposables)
        {
            return _disposables.ToArray().AsEnumerable().GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public bool IsDisposed
    {
        get
        {
            return _disposed;
        }
    }
}

【讨论】:

  • 我正在消化所有这些(我需要在接受之前先睡一觉),但它非常详细且经过深思熟虑。非常感谢!
  • 这非常彻底。我希望我前段时间找到了这个。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-09-25
  • 1970-01-01
  • 1970-01-01
  • 2013-01-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多