【问题标题】:Reactive Extensions (RX) Custom Scheduler C#反应式扩展 (RX) 自定义调度程序 C#
【发布时间】:2015-08-28 20:37:49
【问题描述】:

是否可以构建一个自定义调度程序,它可以检查通过 IObservable 的每个元素的值,以确定哪个线程来处理该项目?

我需要按顺序处理具有相同键但并行处理不同键的项目。让 RX 进行调度而不是比我想要的更早离开 observable 以便将每个值分配给线程是有意义的。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    您是否尝试过GroupBy 后跟ObserveOn

    类似:

    source
        .GroupBy(item => item.Key)
        .SelectMany(group => group
            .ObserveOn(Scheduler.NewThread)
            .Select(item => process(item))
        )
        .Subscribe(processResult => ...);
    

    这将按键对流进行分区,为每个键启动一个新线程,并为该键中的每个项目运行process()

    【讨论】:

    • 那很好。我刚刚在 LINQPad 中运行了一个测试,它运行良好。我必须在某个时候使用它。
    • 优雅但如果有数千个键怎么办?理想情况下,调度程序可以使用线程池线程上的任务顺序处理具有相同键的项目?
    • 当然。您可能想改用Scheduler.Default,这将根据您的平台使用线程池或任务。
    • 我发现使用哪个调度程序(NewThread、TaskPoolScheduler、ThreadPoolScheduler)并不重要,不同的线程用于处理每个键的项目,所以对于 500 个键,我看到了 500 个不同的线程身份证。我猜每个调度程序都在创建一个长时间运行的任务或线程池线程,它会在可观察期间保持它。
    • 是的,这可能是ObserveOn 的工作方式。您可以改用GroupByUntil(item => item.Key, group => group.Throttle(TimeSpan.FromSeconds(30)),这将导致密钥(及其线程)在最近没有看到密钥时关闭(如果密钥再次出现,它将重新启动)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-04
    • 1970-01-01
    • 2011-09-25
    • 1970-01-01
    • 2023-03-25
    相关资源
    最近更新 更多