【问题标题】:BlockingCollection ToObservable with Subscribe using Reactive ExtensionsBlockingCollection ToObservable 和订阅使用响应式扩展
【发布时间】:2019-12-16 23:43:49
【问题描述】:

我很少有生产者会继续填写有限容量为 200 的 BlockingCollection。我正在使用 Reactive Extensions 并像这样调用下面的异步方法,我想知道这是否是实现它的正确方法(使用GetConsumingEnumerable 是否会有任何性能/并发问题,因为每个订阅者都会获得单独的订阅者)。我使用FromAsync 来避免根据 GitHub 上的其他文档在订阅者上调用异步。 userActivitesBlockingCollection。此代码在 ASP.NET Web REST API 的后台线程上运行。仅供参考,我们使用 BlockingCollection 作为多线程,我们希望确保按顺序订阅它接收数据的方式并使用基于 [Github] 的 FromAsync( https://github.com/dotnet/reactive/issues/459) 问题。

IObservable<EventData> eventObservable = userActivites.
    GetConsumingEnumerable().
    ToObservable(TaskPoolScheduler.Default);

eventObservable.Select(number => Observable.FromAsync(async () =>
    await SendDataToEventHubWithBatchAsync(number))).Concat().Subscribe();

【问题讨论】:

    标签: c# asp.net async-await system.reactive blockingcollection


    【解决方案1】:

    BlockingCollection 将阻塞线程,这对于 ASP.NET 应用程序来说是非常不受欢迎的。您是否考虑过使用像 BufferBlockChannel 这样的异步队列?您可以轻松地将两者都用作可观察对象:

    var block = new BufferBlock<UserActivity>(new DataflowBlockOptions() { BoundedCapacity = 200 });
    var observable = block.AsObservable();
    

    ...或

    var channel = Channel.CreateBounded<UserActivity>(200);
    var observable = channel.Reader.ReadAllAsync().ToObservable();
    

    【讨论】:

    • @Zoulias- 谢谢...但我的要求是我们想要订阅,当项目被添加到收藏时,我们想要订阅它,所以我们不必等待收藏完成并检查计数。
    • @Punit BufferBlockChannel 都以这种方式运行。您是否检查过它们,但它们不适合您的需求?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-06-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-04
    相关资源
    最近更新 更多