【发布时间】:2019-12-16 23:43:49
【问题描述】:
我很少有生产者会继续填写有限容量为 200 的 BlockingCollection。我正在使用 Reactive Extensions 并像这样调用下面的异步方法,我想知道这是否是实现它的正确方法(使用GetConsumingEnumerable 是否会有任何性能/并发问题,因为每个订阅者都会获得单独的订阅者)。我使用FromAsync 来避免根据 GitHub 上的其他文档在订阅者上调用异步。 userActivites 是 BlockingCollection。此代码在 ASP.NET Web REST API 的后台线程上运行。仅供参考,我们使用 BlockingCollection 作为多线程,我们希望确保按顺序订阅它接收数据的方式并使用基于 [Github] 的 FromAsync(
https://github.com/dotnet/reactive/issues/459) 问题。
IObservable<EventData> eventObservable = userActivites.
GetConsumingEnumerable().
ToObservable(TaskPoolScheduler.Default);
eventObservable.Select(number => Observable.FromAsync(async () =>
await SendDataToEventHubWithBatchAsync(number))).Concat().Subscribe();
【问题讨论】:
标签: c# asp.net async-await system.reactive blockingcollection