【问题标题】:How can I route Observable values to different Subscribers?如何将 Observable 值路由到不同的订阅者?
【发布时间】:2016-09-23 15:24:00
【问题描述】:

这都是伪代码...

好的,这是我的场景,我有一个传入的数据流,它被解析成数据包。

我有一个IObservable<Packets> Packets

每个数据包都有一个数据包ID,即1、2、3、4

我想创建只接收特定 ID 的 observable。

所以我这样做:

Packets.Where(p=>p.Id == 1)

例如...这给了我一个 IObservable<Packets>,它只给我 Id 1 的数据包。

我可能有以下几种:

Packets.Where(p=>p.Id == 2)
Packets.Where(p=>p.Id == 3)
Packets.Where(p=>p.Id == 4)
Packets.Where(p=>p.Id == 5)

这基本上是可行的,但我想选择的 Id 越多,需要的处理就越多,即 p=>p.Id 将为每个 Id 运行,即使在找到目标 Observable 之后也是如此。

如何进行路由以使其更高效,类似的东西:

字典侦听器;

listeners.GetValue(packet.Id).OnDataReceived(packet)

这样,只要我的一个 IObservables 获取了一个 id,其他人就看不到它了吗?

更新

根据 Lee Campbell 的 groupby 建议添加了一个扩展:

public static class IObservableExtensions
{
    class RouteTable<TKey, TSource>
    {
        public static readonly ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>> s_routes = new ConditionalWeakTable<IObservable<TSource>, IObservable<IGroupedObservable<TKey, TSource>>>();
    }

    public static IObservable<TSource> Route<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> selector, TKey id)
    {
        var grouped = RouteTable<TKey, TSource>.s_routes.GetValue(source, s => s.GroupBy(p => selector(p)).Replay().RefCount());
        return grouped.Where(e => e.Key.Equals(id)).SelectMany(e => e);
    }
}

它会这样使用:

Subject<Packet> packetSubject = new Subject<Packet>();

        var packets = packetSubject.AsObservable();

        packets.Route((p) => p.Id, 5).Subscribe((p) =>
        {
            Console.WriteLine("5");
        });

        packets.Route((p) => p.Id, 4).Subscribe((p) =>
        {
            Console.WriteLine("4");
        });

        packets.Route((p) => p.Id, 3).Subscribe((p) =>
        {
            Console.WriteLine("3");
        });

        packetSubject.OnNext(new Packet() { Id = 1 });
        packetSubject.OnNext(new Packet() { Id = 2 });
        packetSubject.OnNext(new Packet() { Id = 3 });
        packetSubject.OnNext(new Packet() { Id = 4 });
        packetSubject.OnNext(new Packet() { Id = 5 });
        packetSubject.OnNext(new Packet() { Id = 4 });
        packetSubject.OnNext(new Packet() { Id = 3 });

输出是: 3、4、5、4、3

它只在看到新的数据包 ID 时检查每个组的 ID。

【问题讨论】:

  • 为什么您认为当前的方法意味着需要更多的处理?意义重大吗?另外,“即使在找到目标 Observable 之后”是什么意思?
  • 进一步考虑这一点,.Where 运算符对于这个用例来说是完全正确的。除非每次订阅 observable 的设置成本巨大,否则没有更好的方法。如果设置成本很高,那么您可以使用.Publish() 共享订阅,但您仍然可以使用.Where(...) 来过滤值。

标签: c# .net system.reactive


【解决方案1】:

这是我很久以前写的一个运算符,但我认为它可以满足您的需求。我仍然认为简单的.Where 可能更好——即使有多个订阅者。

尽管如此,我想要一个用于可观察对象的 .ToLookup,其操作类似于可枚举对象的相同运算符。

它的内存效率不高,但它实现了IDisposable,以便之后可以清理它。它也不是线程安全的,因此可能需要进行一些强化。

这里是:

public static class ObservableEx
{
    public static IObservableLookup<K, V> ToLookup<T, K, V>(this IObservable<T> source, Func<T, K> keySelector, Func<T, V> valueSelector, IScheduler scheduler)
    {
        return new ObservableLookup<T, K, V>(source, keySelector, valueSelector, scheduler);
    }

    internal class ObservableLookup<T, K, V> : IDisposable, IObservableLookup<K, V>
    {
        private IDisposable _subscription = null; 
        private readonly Dictionary<K, ReplaySubject<V>> _lookups = new Dictionary<K, ReplaySubject<V>>();

        internal ObservableLookup(IObservable<T> source, Func<T, K> keySelector, Func<T, V> valueSelector, IScheduler scheduler)
        {
            _subscription = source.ObserveOn(scheduler).Subscribe(
                t => this.GetReplaySubject(keySelector(t)).OnNext(valueSelector(t)),
                ex => _lookups.Values.ForEach(rs => rs.OnError(ex)),
                () => _lookups.Values.ForEach(rs => rs.OnCompleted()));
        }

        public void Dispose()
        {
            if (_subscription != null)
            {
                _subscription.Dispose();
                _subscription = null;
                _lookups.Values.ForEach(rs => rs.Dispose());
                _lookups.Clear();
            }
        }

        private ReplaySubject<V> GetReplaySubject(K key)
        {
            if (!_lookups.ContainsKey(key))
            {
                _lookups.Add(key, new ReplaySubject<V>());
            }
            return _lookups[key];
        }

        public IObservable<V> this[K key]
        {
            get
            {
                if (_subscription == null) throw new ObjectDisposedException("ObservableLookup");
                return this.GetReplaySubject(key).AsObservable();
            }
        }
    }
}

public interface IObservableLookup<K, V> : IDisposable
{
    IObservable<V> this[K key] { get; }
}

你会这样使用它:

IObservable<Packets> Packets = ...

IObservableLookup<int, Packets> lookup = Packets.ToLookup(p => p.Id, p => p, Scheduler.Default);

lookup[1].Subscribe(p => { });
lookup[2].Subscribe(p => { });
// etc

这样做的好处是,您可以在 源 observable 生成具有该键的值之前通过键订阅值。

完成清理资源后不要忘记致电lookup.Dispose()

【讨论】:

  • _lookups.Values.ForEach 方法未找到?
  • @DanUK86 - 抱歉,我也在使用交互式扩展 - NuGet "System.Interactive"。
【解决方案2】:

我建议查看GroupBy,然后检查是否有性能回报。我认为有,但它重要吗?

Packets.GroupBy(p=>p.Id)

关于如何使用GroupBy 作为一种路由器的测试示例代码

var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
    ReactiveTest.OnNext(100, 1),
    ReactiveTest.OnNext(200, 2),
    ReactiveTest.OnNext(300, 3),
    ReactiveTest.OnNext(400, 4),
    ReactiveTest.OnNext(500, 5),
    ReactiveTest.OnNext(600, 6),
    ReactiveTest.OnNext(700, 7),
    ReactiveTest.OnNext(800, 8),
    ReactiveTest.OnNext(900, 9),
    ReactiveTest.OnNext(1000, 10),
    ReactiveTest.OnNext(1100, 11)
    );

var router = source.GroupBy(i=>i%4)
    .Publish()
    .RefCount();

var zerosObserver = scheduler.CreateObserver<int>();
router.Where(grp=>grp.Key == 0)
    .Take(1)
    .SelectMany(grp=>grp)
    .Subscribe(zerosObserver);

var onesObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 1)
    .Take(1)
    .SelectMany(grp => grp)
    .Subscribe(onesObserver);

var twosObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 2)
        .Take(1)
        .SelectMany(grp => grp)
        .Subscribe(twosObserver);

var threesObserver = scheduler.CreateObserver<int>();
router.Where(grp => grp.Key == 3)
        .Take(1)
        .SelectMany(grp => grp)
        .Subscribe(threesObserver);

scheduler.Start();

ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(400, 4), ReactiveTest.OnNext(800, 8)}, zerosObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(100, 1), ReactiveTest.OnNext(500, 5), ReactiveTest.OnNext(900, 9)}, onesObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(200, 2), ReactiveTest.OnNext(600, 6), ReactiveTest.OnNext(1000, 10) }, twosObserver.Messages);
ReactiveAssert.AreElementsEqual(new[] { ReactiveTest.OnNext(300, 3), ReactiveTest.OnNext(700, 7), ReactiveTest.OnNext(1100, 11)}, threesObserver.Messages);

【讨论】:

  • 我也想过.GroupBy,但是只有当密钥正确并且因此实际上是.Where 时,OP 才需要订阅内部可观察对象,但在这种情况下它正在构建一个不必要的内部字典? OP 正在根据密钥进行过滤,我想不出任何方法来避免 .Where
  • 您仍然需要Where,但是每个订阅只评估一次。 GroupBy 每个值执行一次分支。因此,您有 O1 成本与 On 成本(其中 n 是订阅者数量)
  • 我认为这是一个在性能上令人毛骨悚然的问题。我认为真正了解性能更高的唯一方法是对各种方法进行计时。
  • 同意,并在我的回答中发布。然而,在交易应用程序中,我们在客户端上每秒看到数十到数百条消息,如果检查每个订阅者的每条消息,这无疑是一种浪费。那只是把 CPU 周期扔在地上。
  • 我希望看到不同替代方案之间的比较。我刚刚发布的答案是以 CPU 换取内存,所以我认为这将归结为意见问题,而不是任何硬科学。
【解决方案3】:

您可以使用GroupBy 来拆分数据。我建议您先设置所有订阅,然后激活您的来源。这样做会导致一个巨大的嵌套GroupBy 查询,但也可以多播您的组并单独订阅它们。我在下面编写了一个小型辅助实用程序来执行此操作。

因为在源被激活(通过Connect 完成)后您可能仍想添加新路由,所以我们使用Replay 重播组。 Replay 也是一个多播运算符,所以我们不需要Publish 来进行多播。

public sealed class RouteData<TKey, TSource>
{
    private IConnectableObservable<IGroupedObservable<TKey, TSource>> myRoutes;

    public RouteData(IObservable<TSource> source, Func<TSource, TKey> keySelector)
    {
        this.myRoutes = source.GroupBy(keySelector).Replay();
    }

    public IDisposable Connect()
    {
        return this.myRoutes.Connect();
    }

    public IObservable<TSource> Get(TKey id)
    {
        return myRoutes.FirstAsync(e => e.Key.Equals(id)).Merge();
    }
}

public static class myExtension
{
    public static RouteData<TKey, TSource> RouteData<TKey, TSource>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
    {
        return new RouteData<TKey, TSource>(source, keySelector);
    }
}

示例用法:

public class myPackage
{
    public int Id;

    public myPackage(int id)
    {
        this.Id = id;
    }
}

class program
{
    static void Main()
    {
        var source = new[] { 0, 1, 2, 3, 4, 5, 4, 3 }.ToObservable().Select(i => new myPackage(i));
        var routes = source.RouteData(e => e.Id);
        var subscription = new CompositeDisposable(
            routes.Get(5).Subscribe(Console.WriteLine),
            routes.Get(4).Subscribe(Console.WriteLine),
            routes.Get(3).Subscribe(Console.WriteLine),
            routes.Connect());
        Console.ReadLine();
    }
}

【讨论】:

    【解决方案4】:

    您可能需要考虑编写一个自定义 IObserver 来满足您的需求。我在下面提供了一个示例。

    void Main()
    {
        var source = Observable.Range(1, 10);
        var switcher = new Switch<int, int>(i => i % 3);
        switcher[0] = Observer.Create<int>(val => Console.WriteLine($"{val} Divisible by three"));
        source.Subscribe(switcher);
    }
    
    class Switch<TKey,TValue> : IObserver<TValue>
    {
        private readonly IDictionary<TKey, IObserver<TValue>> cases;
        private readonly Func<TValue,TKey> idExtractor;
    
        public IObserver<TValue> this[TKey decision]
        {
            get
            {
                return cases[decision];
            }
            set
            {
                cases[decision] = value;
            }
        }
    
        public Switch(Func<TValue,TKey> idExtractor)
        {
            this.cases = new Dictionary<TKey, IObserver<TValue>>();
            this.idExtractor = idExtractor;
        }
    
        public void OnNext(TValue next)
        {
            IObserver<TValue> nextCase;
            if (cases.TryGetValue(idExtractor(next), out nextCase))
            {
                nextCase.OnNext(next);
            }
        }
    
        public void OnError(Exception e)
        {
            foreach (var successor in cases.Values)
            {
                successor.OnError(e);
            }
        }
    
        public void OnCompleted()
        {
            foreach (var successor in cases.Values)
            {
                successor.OnCompleted();
            }
        }
    }
    

    您显然需要实现 idExtractor 来从您的数据包中提取 id。

    【讨论】:

    • 我喜欢这个,准备试试看……乍一看还不错。
    • 这样做没有任何好处。为什么不创建过滤 1s、2s、3s 等的 observables 并订阅它们呢?您的代码现在突然变得复杂且难以维护,因为我不知道性能增益是什么
    • 如果你发现自己实现了IObserver&lt;TValue&gt;(或IObservable&lt;TValue&gt;),那么你可能做错了什么。
    • @DanUK86 - 不,丹,这很糟糕。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-10
    • 2019-09-15
    • 2019-03-09
    • 2018-09-08
    • 1970-01-01
    • 1970-01-01
    • 2023-04-02
    相关资源
    最近更新 更多