【问题标题】:Async Queue Processing With Reactive Extensions使用响应式扩展的异步队列处理
【发布时间】:2011-06-30 16:55:35
【问题描述】:

有几篇关于这个的文章,我有这个工作......但我想知道如何一次为我的 Observable 订阅设置最大数量的任务线程。

我有以下方法来并行化日志条目的异步保存:

private BlockingCollection<ILogEntry> logEntryQueue;

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);

要安排我的保存...但是如何指定调度程序一次使用的最大线程数?

【问题讨论】:

  • 如果您创建调度程序只是为了限制最大并发线程数,那么请考虑查看 TPL 数据流。它是专门为创建管道而构建的,其中管道中的每个块都有不同的并发限制。至少当我对这两种方法进行原型设计时,它对我来说更易于理解和维护。

标签: .net system.reactive


【解决方案1】:

这不是 Observable 的功能,而是 Scheduler 的功能。 Observable 定义what,调度器定义where

您需要传入一个自定义调度程序。一个简单的方法是继承 TaskScheduler 并覆盖“MaximumConcurrencyLevel”属性。

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

我实际上在 MSDN 上找到了一个示例:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

编辑:您询问了如何从 TaskScheduler 转到 IScheduler。另一位开发者刚刚给了我一点信息:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);

【讨论】:

  • 太棒了,谢谢...但是如何为我的新 TaskScheduler 获得 Reactive IScheduler?我不想重新实现整个功能。
【解决方案2】:

如果您将“工作”创建为 IObservable&lt;T&gt; 并延迟执行(即他们想要在订阅之前做任何事情),您可以使用接受多个最大并发订阅数的 Merge 重载:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize();

queue
    .Select(item => StartWork(item))
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes
    .Subscribe();

// To enqueue:
synchronizedQueue.OnNext(new QueueItem());

【讨论】:

  • 如何处理队列处理时添加到队列的其他线程?枚举过程中集合被修改不会受到影响吗?
  • Merge 运算符是线程安全的,但我已更新我的答案以使用线程安全主题以防万一。
  • 我想这只是我的困惑......这不是我关心的线程安全,而是队列正在被修改(其他项目正在排队),因为它正在被枚举。 ..这不是问题吗?
  • 本例中的队列是一个反应式“集合”,它的存在是为了随着时间的推移接受新值
  • Richard 遗漏了一个关键假设,即您的 StartWork 函数具有签名“IObservable StartWork(QueueItem item)” - 您正在寻找的合并重载仅适用于 IObservable>
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-09-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-05
相关资源
最近更新 更多