【问题标题】:Observing a threadsafe collection using the Reactive Framework使用响应式框架观察线程安全集合
【发布时间】:2012-05-09 21:39:38
【问题描述】:

这将归结为一个相当高级的问题。 我有一个大型数据集,需要(独立地)验证每一行。 我想使用 Parallel.Foreach,并在该行上调用验证方法。假设此方法是线程安全的。 如果验证返回错误,我需要使用此错误更新数据行。 我显然不能从后台线程执行此操作。但是,我不确定实现此错误处理的最佳方法是什么。 我实现这一点的想法是将行 ID 和错误存储在 BlockingCollection 中,因为这是编写线程安全的。然后我会不断地轮询(从后台线程),当后台线程找到数据时,调用表单并更新当前行。

我想知道是否有更简单的方法来做到这一点,使用响应式框架?基本上,我需要一个多生产者线程安全集合,它可以被“观察”,当一个新值被添加到集合中时,“OnNext”将在主线程上执行——这可能吗?理想情况下,我还可以控制这种情况发生的频率(比如每 2-3 秒一次,这样主线程上的回调每 2-3 秒就会更新多行),所以我不会经常调用主线程。

感谢您的宝贵时间。

【问题讨论】:

标签: c# multithreading c#-4.0 system.reactive


【解决方案1】:

这让你觉得如何:

IObservable<bool> ValidateAsync(Row item)
{
    return Observable.Start(() => {
        // TODO: Figure out if the row is valid
        return true;
    }, Scheduler.TaskPoolScheduler);
}

myBigDataTable.ToObservable()
    .Select(x => ValidateAsync(x).Select(y => new { Row = x, IsValid = y }))
    .Merge(10 /* rows concurrently */)
    .ObserveOn(SynchronizationContext.Current /*assuming WinForms */)
    .Subscribe(x => {
        Console.WriteLine("Row {0} validity: {1}", x.Row, x.IsValid);
    });

没有锁,没有愚蠢的容器,没有阻塞,100% 线程安全。

【讨论】:

  • 小问题 - 我是否缺少将 DataTable 对象转换为 Observable 的扩展?
  • 一般来说,任何 IEnumerable 都可以转换为 IObservable - 不过我对 DataTable 不是很熟悉。您想要的大多数扩展方法都存在于System.Reactive.Linq
猜你喜欢
  • 1970-01-01
  • 2011-12-02
  • 1970-01-01
  • 2014-09-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-12-10
相关资源
最近更新 更多