【发布时间】:2016-09-23 15:24:00
【问题描述】:
这都是伪代码...
好的,这是我的场景,我有一个传入的数据流,它被解析成数据包。
我有一个IObservable<Packets> Packets
每个数据包都有一个数据包ID,即1、2、3、4
我想创建只接收特定 ID 的 observable。
所以我这样做:
Packets.Where(p=>p.Id == 1)
例如...这给了我一个 IObservable<Packets>,它只给我 Id 1 的数据包。
我可能有以下几种:
Packets.Where(p=>p.Id == 2)
Packets.Where(p=>p.Id == 3)
Packets.Where(p=>p.Id == 4)
Packets.Where(p=>p.Id == 5)
这基本上是可行的,但我想选择的 Id 越多,需要的处理就越多,即 p=>p.Id 将为每个 Id 运行,即使在找到目标 Observable 之后也是如此。
如何进行路由以使其更高效,类似的东西:
字典侦听器;
listeners.GetValue(packet.Id).OnDataReceived(packet)
这样,只要我的一个 IObservables 获取了一个 id,其他人就看不到它了吗?
更新
根据 Lee Campbell 的 groupby 建议添加了一个扩展:
public static class IObservableExtensions
{
class RouteTable<TKey, TSource>
{
public static readonly ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>> s_routes = new ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>>();
}
public static IObservable<TSource> Route<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> selector, TKey id)
{
var grouped = RouteTable<TKey, TSource>.s_routes.GetValue(source, s => s.GroupBy(p => selector(p)).Replay().RefCount());
return grouped.Where(e => e.Key.Equals(id)).SelectMany(e => e);
}
}
它会这样使用:
Subject<Packet> packetSubject = new Subject<Packet>();
var packets = packetSubject.AsObservable();
packets.Route((p) => p.Id, 5).Subscribe((p) =>
{
Console.WriteLine("5");
});
packets.Route((p) => p.Id, 4).Subscribe((p) =>
{
Console.WriteLine("4");
});
packets.Route((p) => p.Id, 3).Subscribe((p) =>
{
Console.WriteLine("3");
});
packetSubject.OnNext(new Packet() { Id = 1 });
packetSubject.OnNext(new Packet() { Id = 2 });
packetSubject.OnNext(new Packet() { Id = 3 });
packetSubject.OnNext(new Packet() { Id = 4 });
packetSubject.OnNext(new Packet() { Id = 5 });
packetSubject.OnNext(new Packet() { Id = 4 });
packetSubject.OnNext(new Packet() { Id = 3 });
输出是: 3、4、5、4、3
它只在看到新的数据包 ID 时检查每个组的 ID。
【问题讨论】:
-
为什么您认为当前的方法意味着需要更多的处理?意义重大吗?另外,“即使在找到目标 Observable 之后”是什么意思?
-
进一步考虑这一点,
.Where运算符对于这个用例来说是完全正确的。除非每次订阅 observable 的设置成本巨大,否则没有更好的方法。如果设置成本很高,那么您可以使用.Publish()共享订阅,但您仍然可以使用.Where(...)来过滤值。
标签: c# .net system.reactive