【问题标题】:Observable stream from StackExchange Redis Pub Sub subscription来自 StackExchange Redis Pub Sub 订阅的可观察流
【发布时间】:2020-08-08 18:48:53
【问题描述】:

目标:

我正在使用 StackExchange Redis 客户端。我的目标是从客户端公开的 Pub Sub Subscriber 创建一个 Observable 流,然后可以反过来支持 Observables 的 1-n 订阅,每个订阅都有自己的过滤器通过 LINQ。 (发布正在按计划进行,问题完全在于订阅特定频道上的事件流。)

背景:

我正在使用 Redis Pub Sub 作为 Event Sourced CQRS 应用程序的一部分。具体用例是将事件发布给多个订阅者,然后更新各种读取模型、发送电子邮件等。

这些订阅者中的每一个都需要过滤他们处理的事件类型,为此我希望使用带有 LINQ 的 Rx .Net(反应式扩展)来 在事件流上提供过滤条件,以有效地处理仅对感兴趣的事件作出反应。使用这种方法无需使用事件总线实现注册处理程序,并允许我通过部署 1-n 个微服务来向系统添加新的投影,每个微服务都有 1-n Observables 使用自己的特定过滤器订阅事件流。

我已经尝试过:

1) 我创建了一个继承自 ObservableBase 的类,重写了 SubscribeCore 方法,该方法接收来自 Observables 的订阅请求,将它们存储在 ConcurrentDictionary 中,并且当每个 Redis 通知从通道到达时,循环通过已注册的 Observable 订阅者并调用他们的 OnNext 方法传递了 RedisValue。

2) 我创建了一个 Subject,它也接受来自 Observables 的订阅,并调用它们的 OnNext 方法。同样,主题的使用似乎被许多人所反对。

问题:

我尝试过的方法确实有效(至少在表面上),性能水平不同,但feel like a hack,而且我没有按照预期的方式使用 Rx。

我看到许多 cmets 应该尽可能使用内置的 Observable 方法,例如 Observable.FromEvent,但至少在我看来,这似乎不可能使用 StackExchange Redis 客户端订阅 API。

我也明白接收流并转发到多个观察者的首选方法是使用 ConnectableObservable,这似乎是为我的场景设计的face(每个微服务内部都会订阅 1-n 个 Observables)。目前,我无法弄清楚如何将 ConnectableObservable 连接到来自 StackExchange Redis 的通知,或者它是否比 Observable 提供了真正的好处。

更新

虽然在我的场景中完成不是问题(处置没问题),但错误处理很重要;例如隔离在一个订阅者中检测到的错误,以防止所有订阅终止。

【问题讨论】:

    标签: c# system.reactive stackexchange.redis


    【解决方案1】:

    这是一种扩展方法,可用于从 ISubscriberRedisChannel 创建 IObservable<RedisValue>

    public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel)
    {
        return Observable.Create<RedisValue>(async (obs, ct) =>
        {
            // as the SubscribeAsync callback can be invoked concurrently
            // a thread-safe wrapper for OnNext is needed
            var syncObs = Observer.Synchronize(obs);
            await subscriber.SubscribeAsync(channel, (_, message) =>
            {
                syncObs.OnNext(message);
            }).ConfigureAwait(false);
    
            return Disposable.Create(() => subscriber.Unsubscribe(channel));
        });
    }
    

    由于没有完成 Redis 通道,因此生成的 IObservable 将永远不会完成,但是您可以放弃 IDisposable 订阅以取消订阅 Redis 通道(这将由许多 Rx 操作员自动完成)。

    用法可能是这样的:

    var subscriber = connectionMultiplexer.GetSubscriber();
    
    var gotMessage = await subscriber.WhenMessageReceived("my_channel")
        .AnyAsync(msg => msg == "expected_message")
        .ToTask()
        .ConfigureAwait(false);
    

    或者按照你的例子:

    var subscriber = connectionMultiplexer.GetSubscriber();
    
    var sendEmailEvents = subscriber.WhenMessageReceived("my_channel")
        .Select(msg => ParseEventFromMessage(msg))
        .Where(evt => evt.Type == EventType.SendEmails);
    
    await sendEmailEvents.ForEachAsync(evt => 
    {
        SendEmails(evt);
    }).ConfigureAwait(false);
    

    其他微服务的过滤方式可能不同。

    【讨论】:

    • 整洁,我喜欢这种方法,+1。 (在接受答案之前,我更愿意等待几天,以便让其他人有机会。)您是否看到使用 ConnectableObservable 优于 Observable 的任何优势,正如我的问题中提到的那样?大多数来源将 ConnectableObservable 描述为设计用于读取流,然后发布到多个辅助观察者的场景。尽管在我的场景中完成不是问题(处置很好),但错误处理很重要;隔离它们以防止所有订阅终止。
    • 如果你想与多个观察者共享同一个订阅,那么你需要使用IConnectableObservable,这可以通过使用.Publish()或调用其他.Multicast()实现来完成WhenMessageReceived 的结果。它根本不应该改变 WhenMessageReceived 的实现方式。
    • 是的,通过WhenMessageReceived 返回IConnectableObservable 以及对该IConnectableObservable 的10 个并发订阅的测试和出色的性能。很好的解决方案。
    • 嘿,从表面上看,这正是我所需要的,但我正在使用 .NET Core(使用 System.Reactive.Core 3.1.1)并且我很难找到.AnyAsync() 和 .ToTask() 扩展使上面的例子对我有用。这些是否也需要与 .WhenMessageReceived() 一起实现?或者这些还没有达到 .NET Core 的实现?
    • @MarekM。感谢您的反馈,我将编辑答案以在 obs 周围添加同步包装器。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-09-03
    • 2020-09-26
    • 2023-02-12
    • 2020-02-01
    • 2019-08-15
    • 2020-11-22
    • 2011-07-15
    相关资源
    最近更新 更多