【发布时间】:2015-08-28 20:37:49
【问题描述】:
是否可以构建一个自定义调度程序,它可以检查通过 IObservable 的每个元素的值,以确定哪个线程来处理该项目?
我需要按顺序处理具有相同键但并行处理不同键的项目。让 RX 进行调度而不是比我想要的更早离开 observable 以便将每个值分配给线程是有意义的。
【问题讨论】:
标签: c# system.reactive
是否可以构建一个自定义调度程序,它可以检查通过 IObservable 的每个元素的值,以确定哪个线程来处理该项目?
我需要按顺序处理具有相同键但并行处理不同键的项目。让 RX 进行调度而不是比我想要的更早离开 observable 以便将每个值分配给线程是有意义的。
【问题讨论】:
标签: c# system.reactive
您是否尝试过GroupBy 后跟ObserveOn?
类似:
source
.GroupBy(item => item.Key)
.SelectMany(group => group
.ObserveOn(Scheduler.NewThread)
.Select(item => process(item))
)
.Subscribe(processResult => ...);
这将按键对流进行分区,为每个键启动一个新线程,并为该键中的每个项目运行process()。
【讨论】:
Scheduler.Default,这将根据您的平台使用线程池或任务。
ObserveOn 的工作方式。您可以改用GroupByUntil(item => item.Key, group => group.Throttle(TimeSpan.FromSeconds(30)),这将导致密钥(及其线程)在最近没有看到密钥时关闭(如果密钥再次出现,它将重新启动)。