【问题标题】:Change and read properties of objects in ConcurrentDictionary in thread safe manner以线程安全的方式更改和读取 ConcurrentDictionary 中对象的属性
【发布时间】:2017-11-01 14:15:07
【问题描述】:

我使用ConcurrentDictionary 在 web api 应用程序中收集内存中的数据。我使用 api 方法在ConcurrentDictionary 中添加和更新对象。并且有后台线程根据对象属性分析和清理这个字典。现在我正在考虑两种方法:
1.AddOrUpdate 方法中使用updateValueFactory 中的字典项锁定,但问题是如何正确读取属性以确保我拥有最新的它的版本,并且我没有在不稳定状态下读取属性

public class ThreadsafeService2
{
    private readonly ConcurrentDictionary<string, ThreadSafeItem2> _storage = 
        new ConcurrentDictionary<string, ThreadSafeItem2>();

    public void AddOrUpdate(string name)
    {
        var newVal = new ThreadSafeItem2();
        _storage.AddOrUpdate(name, newVal, (key, oldVal) =>
        {
            //use lock
            lock (oldVal)
            {
                oldVal.Increment();
            }
            return oldVal;
        });
    }

    public void Analyze()
    {
        foreach (var key in _storage.Keys)
        {
            if (_storage.TryGetValue(key, out var item))
            {
                //how to read it properly?
                long ticks = item.ModifiedTicks;
            }
        }
    }
}
public class ThreadSafeItem2
{
    private long _modifiedTicks;
    private int _counter;
    public void Increment()
    {
        //no interlocked here
        _modifiedTicks = DateTime.Now.Ticks;
        _counter++;
    }

    //now interlocked here
    public long ModifiedTicks => _modifiedTicks;
    public int Counter => _counter;
}

2.在没有锁定的属性级别上使用Interlockedmemory barriers,对我来说看起来有点冗长

public class ThreadsafeService1
    {
            private readonly ConcurrentDictionary<string, ThreadSafeItem1> _storage = 
                new ConcurrentDictionary<string, ThreadSafeItem1>();

            public void AddOrUpdate(string name)
            {
                var newVal = new ThreadSafeItem1();
                _storage.AddOrUpdate(name, newVal, (key, oldVal) =>
                {
                    //no lock here
                    oldVal.Increment();
                    return oldVal;
                });
            }

            public void Analyze()
            {
                foreach(var key in _storage.Keys)
                {
                    if(_storage.TryGetValue(key, out var item))
                    {
                        //reading through interloacked
                        long ticks = item.ModifiedTicks;
                    }
                }
            }
        }

        public class ThreadSafeItem1
        {
            private long _modifiedTicks;
            private int _counter;
            public void Increment()
            {
                //make changes in atomic manner
                Interlocked.Exchange(ref _modifiedTicks, DateTime.Now.Ticks);
                Interlocked.Increment(ref _counter);
            }

            public long ModifiedTicks => Interlocked.Read(ref _modifiedTicks);
            public int Counter => Thread.VolatileRead(ref _counter);
        }

这里的最佳做法是什么?

【问题讨论】:

  • 我认为这取决于Analyze 对这些值的作用。在您阅读long ticks = item.ModifiedTicks 之后使用第二种方法,此值可能已经过时(另一个线程可能在您阅读后已经更新了它)。根据您使用这些值的方式,这可能是一个问题。
  • @Evk 我将它与某个阈值进行比较并决定:我是否应该将其从字典中删除。此外,在工作项目中,我在 ThreadSafeItem 中还有另一种方法,它根据计数器和 modifiedticks 值返回布尔值。
  • 因此,如果您只增加修改的刻度并根据“ModifiedTicks > 阈值”删除项目 - 您并不关心“陈旧”的刻度值。另一方面,如果您根据“ModifiedTicks ModifiedTicks 之后,另一个线程可能会增加它。在这种情况下,您需要锁定整个操作(读取和删除)。如果您不确定 - 锁定整个操作。锁定只是阅读本身通常没有什么意义。
  • @mtkachenko Evk 是完全正确的。由于您正在根据获取的数据修改正在同步的数据,因此整个操作在逻辑上是关键部分的一部分。您应该只使用Dictionary,而不是ConcurrentDictionary,并锁定整个操作以更新值以及整个操作以分析该值。出于多种原因,您当前的解决方案都不安全。
  • @Evk 此背景检查计划每 10 分钟运行一次,我可以阅读并做出决定 - 我是否应该删除它。如果某个线程稍后会更新它,那就太晚了。我只想确保以一致且线程安全的方式读取和写入 item 中的属性。

标签: c# multithreading concurrency


【解决方案1】:

因此,您的两个实现都存在重大问题。第一种解决方案在递增时锁定,但在读取时不锁定,这意味着访问数据的其他地方可以读取无效状态。

一个非技术问题,但仍然是一个主要问题,是您已将您的类命名为ThreadSaveItem,但它实际上并未设计为可从多个线程安全访问。在此实现中,调用者 的职责是确保不会从多个线程访问该项目。如果我看到一个名为ThreadSafeItem 的类,我会假设从多个线程访问它是安全的,并且我不需要同步对它的访问,只要我执行的每个操作都是唯一需要的在逻辑上是原子的。

您的Interlocked 解决方案存在问题,因为您必须修改您正在修改的字段,这些字段在概念上是捆绑在一起的,但您没有将它们的更改同步在一起,这意味着有人可以观察到对其中一个而不是另一个的修改,这是该代码的问题。

接下来,您在这两种解决方案中都使用 AddOrUpdate 并不合适。方法调用的重点是添加一个项目或用另一个项目替换它,而不是改变提供的项目(这就是它需要返回值的原因;你应该生成一个新项目 )。如果您想采用获取可变项并对其进行变异的方法,则可以调用GetOrAdd 来获取现有项或创建新项,然后以线程安全的方式对其进行变异使用返回值。

通过简单地使ThreadSafeItem 不可变,整个解决方案得到了极大的简化。它允许您在ConcurrentDictionary 上使用AddOrUpdate 进行更新,这意味着唯一需要完成的同步是更新ConcurrentDictionary 的值,并且它已经处理了自己状态的同步,访问ThreadSafeItem 时根本不需要进行同步,因为所有对数据的访问都是固有线程安全的,因为它是不可变的。这意味着您实际上根本不需要编写任何同步代码,而这正是您想要尽可能争取的目标。

最后,我们得到了实际的代码:

public class ThreadsafeService3
{
    private readonly ConcurrentDictionary<string, ThreadSafeItem3> _storage =
        new ConcurrentDictionary<string, ThreadSafeItem3>();

    public void AddOrUpdate(string name)
    {
        _storage.AddOrUpdate(name, _ => new ThreadSafeItem3(), (_, oldValue) => oldValue.Increment());
    }

    public void Analyze()
    {
        foreach (var pair in _storage)
        {
            long ticks = pair.Value.ModifiedTicks;
            //Note, the value may have been updated since we checked; 
            //you've said you don't care and it's okay for a newer item to be removed here if it loses the race.
            if (isTooOld(ticks))
                _storage.TryRemove(pair.Key, out _);  
        }
    }
}

public class ThreadSafeItem3
{
    public ThreadSafeItem3()
    {
        Counter = 0;
    }
    private ThreadSafeItem3(int counter)
    {
        Counter = counter;
    }
    public ThreadSafeItem3 Increment()
    {
        return new ThreadSafeItem3(Counter + 1);
    }

    public long ModifiedTicks { get; } = DateTime.Now.Ticks;
    public int Counter { get; }
}

【讨论】:

  • 由于非常密集的内存分配,这里是否可能出现 GC 问题?
  • @mtkachenko 随意分析代码,看看您是否因使用此解决方案而遇到性能问题。一旦它变成不可变的,这个对象就值得考虑作为一个值类型。如果您的分析器得出可证明的证据表明额外分配导致足够的额外 GC 压力成为问题(我非常怀疑这种情况,但并非不可能),那么您可以考虑将 ThreadSafeItem 设为值类型;它当然是一体的。除非有可衡量的问题,否则我不会搞砸它。
【解决方案2】:

我无法评论你到底在做什么,但联锁和并发字典比你自己做的锁要好。

我会质疑这种方法。您的数据足够重要,但不那么重要,无法持久保存它?根据应用程序的使用情况,这种方法会在一定程度上减慢它的速度。同样,不知道您在做什么,您可以将每个“添加”放入 MSMQ,然后按某个计划运行外部 exe 来处理这些项目。该网站只会触发并忘记,没有线程要求。

【讨论】:

  • 客户端经常调用这个 api,所以我在内存中进行了一些预聚合,然后再将其放入数据库。
  • 基于此,如果您使用队列将其聚合然后将其放入数据库中,您将不会遇到任何缩放或性能问题。它还可以与数据库进行事务处理。
  • 此服务具有“可扩展性”的客户端库。
  • 使用任何线程锁,不会使任何应用程序规模化。您告诉所有线程停止并等待直到操作完成。使用 Interlock 和 Concurrent Dictionary 比手动操作要好。
  • @acivic2nv 线程仅在存在争用时停止和等待。在绝大多数情况下,锁不是高度上下文化的,因此对性能的影响几乎为零。此外,ConcurrentDictionary 也使用锁,它只是尽可能不频繁地使用它们。同样,Interlocked 在高争用情况下也有自己的开销。但当然,在您找到可行的解决方案之前,所有这些都是无关紧要的。 两种 OP 的解决方案实际上并不安全。唯一的正确解决方案是锁定整个两个操作。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-27
  • 2017-02-27
  • 2012-08-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多