【问题标题】:How do I get list of observers from a Subject in C#如何从 C# 的主题中获取观察者列表
【发布时间】:2017-11-21 21:59:36
【问题描述】:

我有一个订阅多个观察者的主题 (System.Reactive.Subjects)。是否可以从这个主题中获取观察者列表?

基本上我需要将观察者从主题 A 复制到主题 B。

var newSubject = new Subject();
if (oldSubject.HasObservers)
{
    var observers = oldSubject. <-- not quite sure what to do here.
    foreach(var obs in observers)
    {
        newSubject.Subscribe(obs);
    }
}

Update1 - 为什么我需要这样做

另一个库,我无法控制对 oldSubject 的 onCompleted 调用。然后他们会打另一个电话询问主题。如果我返回 oldSubject,所有订阅更新的观察者将不再获得更新。所以,我正在考虑创建一个新的主题,将活跃的观察者粘在上面,然后返回这个新主题。

Update2 - 示例代码问题的完整上下文

我正在使用一个公开 Observable 的库。然而,这个 observable 只接受一个观察者(请不要问我为什么以及如何,这是我不能在公共论坛上讨论的信息)。所以,我正在编写一个 Observable 层(使用 Subject),它允许多个观察者订阅更新。

我传递给另一个库的单个观察者如下:

public class DataObserver<T> : IObserver<IList<ChangeEvent<T>>>
{
    public DataObserver(string id, Subject<Tuple<string, IList<ChangeEvent<T>>>> subject)
    {
        Id = id;
        Subject = subject;
    }

    public Subject<Tuple<string, IList<ChangeEvent<T>>>> Subject { get; set; }

    public string Id { get; }

    public void OnCompleted()
    {
        Subject?.OnCompleted();
    }

    public void OnError(Exception error)
    {
        Subject?.OnError(error);
    }

    public void OnNext(IList<ChangeEvent<T>> value)
    {
        Subject?.OnNext(new Tuple<string, IList<ChangeEvent<T>>>(Id, value));
    }
}

现在其他库通过下面 MySingletonClass 中的 TryGetObserver 方法获取该观察者:

public MySingletonClass 
{
    public MySingletonClass() 
    {
        DataObservables = new ConcurrentDictionary<string, Tuple<Type, dynamic>>();
    }   

    public ConcurrentDictionary<string, Tuple<Type, dynamic>> DataObservables { get; set; }

    public (bool Success, IObserver<IList<ChangeEvent<T>>> Observer) TryGetObserver<T>(string id)
    {
        var dataSubject = new Subject<Tuple<string, IList<ChangeEvent<T>>>>();
        var dataObserver = new DataObserver<T>(id, dataSubject);
        var hasObserver = DataObservables.ContainsKey(id);
        if (hasObserver)
        {
            var val = DataObservables[id];
            if (val.Item2 is DataObserver<T> extantObserver && extantObserver.Subject.HasObservers)
            {
                // var observers = extantObserver.Subject. <-- This is not going to work.
                // foreach(var obs in observers)
                // {
                //    dataObserver.Subject.Subscribe(obs);
                // }
            }
        }

        DataObservables.TryAdd(id, new Tuple<Type, dynamic>(typeof(T), dataObserver));

        return (true, dataObserver);
    }
}

所以,其他库将第一次调用 TryGetObserver,我将返回正确的对象。然后它会在某个时候从该对象中的 Subject 调用 onCompleted。随后,它将再次调用 TryGetObserver,现在我必须返回一个包含所有先前订阅的观察者的新主题。

【问题讨论】:

  • 除非您创建自己的装饰器主题并自己跟踪观察者,否则您不能这样做。但是为什么你需要这样做呢?
  • 为什么需要复制观察者?他们将积极订阅您的原始主题,并且不会自动获得新主题的第二个订阅。你想做什么?请编辑您的问题以获得解释。
  • 更新问题。
  • @Enigmativity 谢谢。正在考虑制作一个包装主题类。
  • @Manas - 不要包装主题 - 它对你不起作用。订阅结束后,任何获得OnCompleted 的观察者都将不再接受任何值。一旦调用了OnCompleted,就好像在订阅中调用了.Dispose()。你需要想出另一种方法来获得你需要的行为。

标签: c# system.reactive reactive-programming


【解决方案1】:

我通过以下方式解决了这个问题:

  • 创建一个环绕原生主题的自定义主题。
  • 在调用 OnCompleted 时未调用 subject.OnCompleted(请参阅上面的问题描述)。相反,当流被标记为完成时,我使用我自己的自定义跟踪器来做必要的事情。这确保订阅的观察者不会收到流停止的信号,我可以重用它们。事实证明,由于框架将私有成员 isStopped(在每个观察者上)标记为 1,因此每当在 observable 上调用 OnCompleted 时,复制观察者的最初想法永远不会奏效。

这并不理想,但就我们的软件而言,这是一个可以接受的解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-09-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多