【问题标题】:Synchronization mechanism for an observable object可观察对象的同步机制
【发布时间】:2013-12-20 01:19:21
【问题描述】:

假设我们必须同步对共享资源的读/写访问。多个线程将以读取和写入的方式访问该资源(大部分时间用于读取,有时用于写入)。我们还假设每次写入都会触发一次读取操作(对象是可观察的)。

对于这个例子,我会想象一个这样的类(请原谅语法和风格,这只是为了说明目的):

class Container {
    public ObservableCollection<Operand> Operands;
    public ObservableCollection<Result> Results;
}

我很想为此使用ReadWriterLockSlim,而且我会把它放在Container 级别(想象对象不是那么简单,一个读/写操作可能涉及多个对象):

public ReadWriterLockSlim Lock;

OperandResult 的实现对于本示例没有任何意义。 现在让我们想象一些代码观察到Operands,并将产生一个结果放入Results

void AddNewOperand(Operand operand) {
    try {
        _container.Lock.EnterWriteLock();
        _container.Operands.Add(operand);
    }
    finally {
        _container.ExitReadLock();
    }
}

我们假设的观察者会做类似的事情,但要消耗一个新元素,它会用EnterReadLock() 锁定来获取操作数,然后用EnterWriteLock() 来添加结果(让我省略代码)。由于递归,这将产生一个异常,但如果我设置LockRecursionPolicy.SupportsRecursion,那么我只会将我的代码打开死锁(来自MSDN):

默认情况下,ReaderWriterLockSlim 的新实例是使用 LockRecursionPolicy.NoRecursion 标志创建的,并且不允许递归。建议对所有新开发使用此默认策略,因为递归会引入不必要的复杂性并使您的代码更容易出现死锁

为了清楚起见,我重复相关部分:

递归 [...] 使您的代码更容易出现死锁。

如果我对LockRecursionPolicy.SupportsRecursion 没有错,如果我从同一个线程问一个,比如说,读锁,然后 有人 否则要求一个写锁,那么我就会死锁那么MSDN所说的有道理。此外,递归也会以可衡量的方式降低性能(如果我使用ReadWriterLockSlim 而不是ReadWriterLockMonitor,这不是我想要的)。

问题

最后我的问题是(请注意,我不是在寻找关于一般同步机制的讨论,我会知道这个生产者/可观察者/观察者场景有什么问题):

  • 在这种情况下有什么更好的方法?避免ReadWriterLockSlim 支持Monitor(即使在现实世界中,代码读取将远多于写入)?
  • 放弃如此粗略的同步?这甚至可能会产生更好的性能,但它会使代码更加复杂(当然不是在这个例子中,而是在现实世界中)。
  • 我应该让通知(来自观察到的集合)异步吗?
  • 还有什么我看不到的?

我知道没有最佳同步机制,所以我们使用的工具必须适合我们的情况,但我想知道是否有一些最佳实践,或者我只是忽略线程和观察者之间非常重要的事情(想象一下使用Microsoft Reactive Extensions,但问题是一般性的,与该框架无关)。

可能的解决方案?

我会尝试(以某种方式)推迟事件:

第一种解决方案
每个更改都不会触发任何CollectionChanged 事件,它保存在队列中。当提供者(推送数据的对象)完成后,它将手动强制刷新队列(按顺序引发每个事件)。这可以在另一个线程中完成,甚至可以在调用者线程中完成(但在锁之外)。

它可能有效,但它会让一切变得不那么“自动”(每个更改通知都必须由生产者自己手动触发,要编写更多代码,周围有更多错误)。

第二个解决方案
另一种解决方案可能是为我们的 lock 提供对 observable 集合的引用。如果我将ReadWriterLockSlim 包装在自定义对象中(有助于将其隐藏在易于使用的IDisposable 对象中),我可以添加ManualResetEvent 以通知所有锁都已以这种方式释放,集合本身可能会引发事件(再次在同一个线程或另一个线程中)。

第三种解决方案
另一个想法可能是让事件异步。如果事件处理程序需要一个锁,那么它将停止等待它的时间框架。为此,我担心可能会使用大量线程(特别是如果来自线程池)。

老实说,我不知道这些是否适用于现实世界的应用程序(就个人而言 - 从用户的角度来看 - 我更喜欢第二个,但这意味着对所有内容进行自定义收集,它使收集意识到线程,我会避免如果可能的话)。我不想让代码变得过于复杂。

【问题讨论】:

  • 我只想提一下,我发现EnterReadLockAdd 的组合非常可怕。该代码打算只读取,但它也写入集合。您确定不想在该特定时间点使用EnterWriteLock
  • @Caramiriel 你是对的,我固定了例子!
  • 如果你让你的方法更粗略一些,例如将 Operands 和 Result 设为只读属性并添加 AddOperand 和 AddResult 方法,您将能够将 Lock 设为私有并更好地控制发生的事情。还是我完全没有抓住重点?
  • @flup 你说得对。我的问题是它会使一切变得更加复杂,并且模型会意识到线程(如果可能的话,我会避免这种情况,因为当它在单个线程中使用时会影响性能设想)。此外,模型本身当然比我的例子复杂得多。可能是一个线程安全层使用您建议的方法构建的模型?
  • 你不能使用 ConcurrentQueue 和/或 BlockingCollection 吗? ObservableCollections 用于您需要以某种方式处理整个集合的情况,但如果您只是在添加新操作数时添加新结果,那听起来像是基于流的操作。或者,或者,使用成对的操作数结果集合怎么样?同样,您也许可以使用某种现有的 Concurrent-collection 类,并且不会出现所有这些问题。

标签: c# .net multithreading thread-synchronization


【解决方案1】:

跨线程集合同步

将 ListBox 绑定到 ObservableCollection,当数据更改时,您会更新 ListBox,因为 INotifyCollectionChanged 已实现。 dell'ObservableCollection 的缺陷是数据只能由创建它的线程更改。

SynchronizedCollection不存在多线程问题但不更新ListBox,因为没有实现INotifyCollectionChanged,即使实现了INotifyCollectionChanged,也只能调用CollectionChanged(this, e)从创建它的线程..所以它不起作用。

结论

-如果您想要一个自动更新单线程的列表,请使用 ObservableCollection

-如果您想要一个不是自动更新而是多线程的列表,请使用 SynchronizedCollection

-如果两者都需要,请使用 Framework 4.5,BindingOperations.EnableCollectionSynchronization 和 ObservableCollection() 这样的方式:

/ / Creates the lock object somewhere
private static object _lock = new object () ;
...
/ / Enable the cross acces to this collection elsewhere
BindingOperations.EnableCollectionSynchronization ( _persons , _lock )

完整示例 http://10rem.net/blog/2012/01/20/wpf-45-cross-thread-collection-synchronization-redux

【讨论】:

  • 谢谢,供参考!问题是SyncrhonizedCollection 使用监视器作为同步机制。我的问题是避免监视器支持更多并发(如ReadWriterLockSlim),用于具有多个并发读取和少量写入(实际上与用户界面没有直接关系)的场景。
【解决方案2】:

我不确定这是否是完全相同的问题,但是在处理相对少量的数据(2k-3k 条目)时,我使用下面的代码来促进对绑定到 UI 的集合的跨线程读/写访问.此代码最初找到here

public class BaseObservableCollection<T> : ObservableCollection<T>
{
  // Constructors
  public BaseObservableCollection() : base() { }
  public BaseObservableCollection(IEnumerable<T> items) : base(items) { }
  public BaseObservableCollection(List<T> items) : base(items) { }

  // Evnet
  public override event NotifyCollectionChangedEventHandler CollectionChanged;

  // Event Handler
  protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
  {
    // Be nice - use BlockReentrancy like MSDN said
    using (BlockReentrancy())
    {
      if (CollectionChanged != null)
      {
        // Walk thru invocation list
        foreach (NotifyCollectionChangedEventHandler handler in CollectionChanged.GetInvocationList())
        {
          DispatcherObject dispatcherObject = handler.Target as DispatcherObject;

          // If the subscriber is a DispatcherObject and different thread
          if (dispatcherObject != null && dispatcherObject.CheckAccess() == false)
          {
            // Invoke handler in the target dispatcher's thread
            dispatcherObject.Dispatcher.Invoke(DispatcherPriority.DataBind, handler, this, e);
          }
          else
          {
            // Execute handler as is
            handler(this, e);
          }
        }
      }
    }
  }
}

我还使用下面的代码(继承自上面的代码)来支持在集合中的项目引发PropertyChanged 时引发CollectionChanged 事件。

public class BaseViewableCollection<T> : BaseObservableCollection<T>
  where T : INotifyPropertyChanged
{
  // Constructors
  public BaseViewableCollection() : base() { }
  public BaseViewableCollection(IEnumerable<T> items) : base(items) { }
  public BaseViewableCollection(List<T> items) : base(items) { }

  // Event Handlers
  private void ItemPropertyChanged(object sender, PropertyChangedEventArgs e)
  {
    var arg = new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, sender, sender);
    base.OnCollectionChanged(arg);
  }

  protected override void ClearItems()
  {
    foreach (T item in Items) { if (item != null) { item.PropertyChanged -= ItemPropertyChanged; } }
    base.ClearItems();
  }

  protected override void InsertItem(int index, T item)
  {
    if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
    base.InsertItem(index, item);
  }

  protected override void RemoveItem(int index)
  {
    if (Items[index] != null) { Items[index].PropertyChanged -= ItemPropertyChanged; }
    base.RemoveItem(index);
  }

  protected override void SetItem(int index, T item)
  {
    if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
    base.SetItem(index, item);
  }
}

【讨论】:

    【解决方案3】:

    这听起来很像多线程泡菜。在这种事件链模式中使用递归是非常具有挑战性的,同时还要避免死锁。您可能需要考虑完全围绕该问题进行设计。

    例如,您可以使添加操作数与引发事件异步:

    private readonly BlockingCollection<Operand> _additions
        = new BlockingCollection<Operand>();
    
    public void AddNewOperand(Operand operand)
    {
        _additions.Add(operand);
    }
    

    然后在后台线程中进行实际添加:

    private void ProcessAdditions()
    {
        foreach(var operand in _additions.GetConsumingEnumerable())
        {
            _container.Lock.EnterWriteLock();
            _container.Operands.Add(operand);
            _container.Lock.ExitWriteLock();
        }
    }
    
    public void Initialize()
    {
        var pump = new Thread(ProcessAdditions)
        {
            Name = "Operand Additions Pump"
        };
        pump.Start();
    }
    

    这种分离牺牲了一些一致性——在 add 方法之后运行的代码实际上并不知道 add 实际发生的时间,这可能对您的代码来说是个问题。如果是这样,可以重写它以订阅观察并使用Task 在添加完成时发出信号:

    public Task AddNewOperandAsync(Operand operand)
    {
        var tcs = new TaskCompletionSource<byte>();
    
        // Compose an event handler for the completion of this task
        NotifyCollectionChangedEventHandler onChanged = null;
        onChanged = (sender, e) =>
        {
            // Is this the event for the operand we have added?
            if (e.NewItems.Contains(operand))
            {
                // Complete the task.
                tcs.SetCompleted(0);
    
                // Remove the event-handler.
                _container.Operands.CollectionChanged -= onChanged;
            }
        }
    
        // Hook in the handler.
        _container.Operands.CollectionChanged += onChanged;
    
        // Perform the addition.
        _additions.Add(operand);
    
        // Return the task to be awaited.
        return tcs.Task;
    }
    

    事件处理程序逻辑在后台线程上引发添加消息,因此它不可能阻塞您的前台线程。如果您在窗口的消息泵上等待添加,则同步上下文足够智能,可以在消息泵线程上安排继续。

    无论您是否走Task 路线,此策略都意味着您可以安全地从可观察事件中添加更多操作数,而无需重新进入任何锁。

    【讨论】:

    • 我应该检查一下它是如何与 AddAsync() 方法一起工作的,但我喜欢这个解决方案!如果消费者需要确定他们添加的内容可用,他们将等待。
    猜你喜欢
    • 2022-11-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-25
    • 2019-09-09
    • 2019-01-30
    相关资源
    最近更新 更多