【发布时间】:2017-11-21 21:59:36
【问题描述】:
我有一个订阅多个观察者的主题 (System.Reactive.Subjects)。是否可以从这个主题中获取观察者列表?
基本上我需要将观察者从主题 A 复制到主题 B。
var newSubject = new Subject();
if (oldSubject.HasObservers)
{
var observers = oldSubject. <-- not quite sure what to do here.
foreach(var obs in observers)
{
newSubject.Subscribe(obs);
}
}
Update1 - 为什么我需要这样做
另一个库,我无法控制对 oldSubject 的 onCompleted 调用。然后他们会打另一个电话询问主题。如果我返回 oldSubject,所有订阅更新的观察者将不再获得更新。所以,我正在考虑创建一个新的主题,将活跃的观察者粘在上面,然后返回这个新主题。
Update2 - 示例代码问题的完整上下文
我正在使用一个公开 Observable 的库。然而,这个 observable 只接受一个观察者(请不要问我为什么以及如何,这是我不能在公共论坛上讨论的信息)。所以,我正在编写一个 Observable 层(使用 Subject),它允许多个观察者订阅更新。
我传递给另一个库的单个观察者如下:
public class DataObserver<T> : IObserver<IList<ChangeEvent<T>>>
{
public DataObserver(string id, Subject<Tuple<string, IList<ChangeEvent<T>>>> subject)
{
Id = id;
Subject = subject;
}
public Subject<Tuple<string, IList<ChangeEvent<T>>>> Subject { get; set; }
public string Id { get; }
public void OnCompleted()
{
Subject?.OnCompleted();
}
public void OnError(Exception error)
{
Subject?.OnError(error);
}
public void OnNext(IList<ChangeEvent<T>> value)
{
Subject?.OnNext(new Tuple<string, IList<ChangeEvent<T>>>(Id, value));
}
}
现在其他库通过下面 MySingletonClass 中的 TryGetObserver 方法获取该观察者:
public MySingletonClass
{
public MySingletonClass()
{
DataObservables = new ConcurrentDictionary<string, Tuple<Type, dynamic>>();
}
public ConcurrentDictionary<string, Tuple<Type, dynamic>> DataObservables { get; set; }
public (bool Success, IObserver<IList<ChangeEvent<T>>> Observer) TryGetObserver<T>(string id)
{
var dataSubject = new Subject<Tuple<string, IList<ChangeEvent<T>>>>();
var dataObserver = new DataObserver<T>(id, dataSubject);
var hasObserver = DataObservables.ContainsKey(id);
if (hasObserver)
{
var val = DataObservables[id];
if (val.Item2 is DataObserver<T> extantObserver && extantObserver.Subject.HasObservers)
{
// var observers = extantObserver.Subject. <-- This is not going to work.
// foreach(var obs in observers)
// {
// dataObserver.Subject.Subscribe(obs);
// }
}
}
DataObservables.TryAdd(id, new Tuple<Type, dynamic>(typeof(T), dataObserver));
return (true, dataObserver);
}
}
所以,其他库将第一次调用 TryGetObserver,我将返回正确的对象。然后它会在某个时候从该对象中的 Subject 调用 onCompleted。随后,它将再次调用 TryGetObserver,现在我必须返回一个包含所有先前订阅的观察者的新主题。
【问题讨论】:
-
除非您创建自己的装饰器主题并自己跟踪观察者,否则您不能这样做。但是为什么你需要这样做呢?
-
为什么需要复制观察者?他们将积极订阅您的原始主题,并且不会自动获得新主题的第二个订阅。你想做什么?请编辑您的问题以获得解释。
-
更新问题。
-
@Enigmativity 谢谢。正在考虑制作一个包装主题类。
-
@Manas - 不要包装主题 - 它对你不起作用。订阅结束后,任何获得
OnCompleted的观察者都将不再接受任何值。一旦调用了OnCompleted,就好像在订阅中调用了.Dispose()。你需要想出另一种方法来获得你需要的行为。
标签: c# system.reactive reactive-programming