【问题标题】:Thread-safe memoization线程安全的记忆
【发布时间】:2010-11-18 07:21:22
【问题描述】:

让我们以Wes Dyer's的函数记忆方法为出发点:

public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
{
  var map = new Dictionary<A, R>();
  return a =>
    {
      R value;
      if (map.TryGetValue(a, out value))
        return value;
      value = f(a);
      map.Add(a, value);
      return value;
    };
}

问题是,当从多个线程中使用它时,我们可能会遇到麻烦:

Func<int, int> f = ...
var f1 = f.Memoize();
...
in thread 1:
var y1 = f1(1);
in thread 2:
var y2 = f1(1);
// We may be recalculating f(1) here!

让我们尽量避免这种情况。锁定map

public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
{
  var map = new Dictionary<A, R>();
  return a =>
    {
      R value;
      lock(map) 
      {
        if (map.TryGetValue(a, out value))
          return value;
        value = f(a);
        map.Add(a, value);
      }
        return value;
    };
}

显然是一个可怕的想法,因为它阻止我们同时计算多个不同 参数的f1。如果a 具有值类型,则锁定a 将不起作用(无论如何都是一个坏主意,因为我们不控制a,外部代码也可能锁定它)。

以下是我能想到的两个选项:

假设有一个 Lazy&lt;T&gt; 类用于惰性求值(参见 here):

public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
{
  var map = new Dictionary<A, Lazy<R>>();
  return a =>
    {
      Lazy<R> result;
      lock(map) 
      {
        if (!map.TryGetValue(a, out result))
        {  
          result = () => f(a);
          map.Add(a, result);
        }
      }
      return result.Value;
    };
}

或者保留一个额外的对象字典用于同步:

public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
{
  var map = new Dictionary<A, R>();
  var mapSync = new Dictionary<A, object>();
  return a =>
    {
      R value;
      object sync;
      lock(mapSync)
      { 
        if (!mapSync.TryGetValue(a, out sync))
        { 
          sync = new object();
          mapSync[a] = sync;
        }
      }
      lock(map)
      {
        if (map.TryGetValue(a, out value))
          return value;
      }
      lock(sync)
      {
        value = f(a);
        lock(map)
        {
          map[a] = value;
        }
        return value;
      }
    };
}

还有更好的选择吗?

【问题讨论】:

    标签: c# multithreading locking thread-safety memoization


    【解决方案1】:

    使用 .net 4.0 的 ConcurrentDictionary&lt;A, R&gt; 而不使用不必要的 Lazy&lt;R&gt;
    关键是GetOrAdd(A, Func&lt;A, R&gt;),它会渲染成一个非常简单的 lambda。

    public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
    {
        var cache = new ConcurrentDictionary<A, R>();
        return a => cache.GetOrAdd(a, f);
    };
    

    更新上述解决方案确实允许多个同时读取器和写入器以最小的开销。但是,它不会阻止f(a) 对同一值多次执行(在计算期间)。

    如果这对您很重要,您可以将值包装在 Lazy&lt;R&gt; 中,但每次读取都会产生费用。

    public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
    {
        var cache = new ConcurrentDictionary<A, Lazy<R>>();
        return a => cache.GetOrAdd(a, new Lazy<R>(() => f(a))).Value;
    }
    

    更新 对预填充的 1000 项缓存的一百万次读取的计时测试显示,ConcurrentDictionary19ms -- 与常规 Dictionary 相同 -- 但720ms 对于Lazy 版本。

    如果这听起来太陡峭,您可以通过更复杂的解决方案获得两全其美的效果。

    public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
    {
        var cache = new ConcurrentDictionary<A, R>();
        var syncMap = new ConcurrentDictionary<A, object>();
        return a =>
        {
            R r;
            if (!cache.TryGetValue(a, out r))
            {
                var sync = syncMap.GetOrAdd(a, new object());
                lock (sync)
                {
                    r = cache.GetOrAdd(a, f);
                }
                syncMap.TryRemove(a, out sync);
            }
            return r;
        };
    }
    

    【讨论】:

    • 我想说这是一个很好的答案。谢谢!
    • 使用带有或不带有 ImmutableInterlocked.Update 的不可变字典怎么样?
    【解决方案2】:

    如果您已经拥有 Lazy&lt;T&gt; 类型,我假设您使用的是 .net 4.0,因此您也可以使用 ConcurrentDictionary&lt;A,R&gt;

    public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
    {
      var map = new ConcurrentDictionary<A, Lazy<R>>();
      return a =>
        {
          Lazy<R> lazy = new Lazy<R>(() => f(a), LazyExecutionMode.EnsureSingleThreadSafeExecution);
          if(!map.TryAdd(a, lazy))
          {
            return map[a].Value;
          }
          return lazy.Value;
        };
    }
    

    【讨论】:

      【解决方案3】:

      扩展 Nigel Touch 的优秀答案,我想提供一个从他的解决方案中提取的可重用组件,限制 f(a) 的调用次数。

      我称之为 SynchronizedConcurrentDictionary,它看起来像这样:

      public class SynchronizedConcurrentDictionary<TKey, TValue> : ConcurrentDictionary<TKey, TValue>
      {
          private readonly ReaderWriterLockSlim _cacheLock = new ReaderWriterLockSlim();
      
          public new TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
          {
              TValue result;
      
              _cacheLock.EnterWriteLock();
              try
              {
                  result = base.GetOrAdd(key, valueFactory);
              }
              finally
              {
                  _cacheLock.ExitWriteLock();
              }
      
              return result;
          }
      }
      

      那么Memoize函数就变成了两行:

      public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
      {
          var cache = new SynchronizedConcurrentDictionary<A, R>();
      
          return key => cache.GetOrAdd(key, f);
      }
      

      干杯!

      【讨论】:

      • 为什么不加评论就投反对票?我只是想提供一些我派生出来并发现对社区有用的东西。有什么问题?
      • 注意:“SynchronizedConcurrentDictionary”这个名字可能很糟糕! ConcurrentDictionary 实现了 ICollection,它有一个属性“IsSynchronized”,该属性获取一个值,该值指示对 ICollection 的访问是否是同步的(线程安全的)。 ConcurrentDictionary 从此属性返回 false,如果您尝试读取 SyncRoot 属性,则会引发异常。名称“SynchronizedConcurrentDictionary”可以解释为暗示集合是通过 SyncRoot 同步的,这是错误的。
      【解决方案4】:

      由于 Lazy 构造函数的 enum 参数,Thomas 的答案似乎无法在 .NET 4.0 下编译。我在下面对其进行了修改。我还添加了一个可选参数,用于提供自己的相等比较器。例如,如果 TInput 没有实现自己的 Equals,或者如果 TInput 是一个字符串并且您想让它不区分大小写,这将非常有用。

          public static Func<TInput, TResult> Memoize<TInput, TResult>(
              this Func<TInput, TResult> func, IEqualityComparer<TInput> comparer = null)
          {
              var map = comparer == null
                            ? new ConcurrentDictionary<TInput, Lazy<TResult>>()
                            : new ConcurrentDictionary<TInput, Lazy<TResult>>(comparer);
      
              return input =>
                     {
                         var lazy = new Lazy<TResult>(() => func(input), LazyThreadSafetyMode.ExecutionAndPublication);
      
                         return map.TryAdd(input, lazy)
                                    ? lazy.Value
                                    : map[input].Value;
                     };
          }
      

      我用这个作为我的测试对这个方法做了一些基本的测试:

          public void TestMemoize()
          {
              Func<int, string> mainFunc = i =>
                                           {
                                               Console.WriteLine("Evaluating " + i);
                                               Thread.Sleep(1000);
                                               return i.ToString();
                                           };
      
              var memoized = mainFunc.Memoize();
      
              Parallel.ForEach(
                  Enumerable.Range(0, 10),
                  i => Parallel.ForEach(Enumerable.Range(0, 10), j => Console.WriteLine(memoized(i))));
          }
      

      它似乎工作正常。

      【讨论】:

        【解决方案5】:

        不,它们不是更好的选择。

        具有惰性评估的版本毫无意义,因为无论如何您都会立即对其进行评估。使用同步字典的版本无法正常工作,因为您在使用之前没有保护锁内的地图字典。

        你所说的可怕的版本实际上是最好的选择。您必须在锁内保护地图字典,以便一次只有一个线程可以访问它。字典不是线程安全的,所以如果你让一个线程读取它而另一个线程正在改变它,你会遇到问题。

        请记住,在地图对象上使用锁并不能保护地图对象本身,它只是使用地图引用作为标识符来一次保留多个线程来运行锁内的代码。您必须将访问对象的所有代码都放在锁中,而不仅仅是更改对象的代码。

        【讨论】:

        • 我已经修复了惰性评估版本。
        • 和同步字典版本。
        • 惰性求值版本仍然是指针,因为值总是立即求值。同步字典版本仍然不安全,因为不同的线程可以为同一个键创建对象,一个会覆盖另一个。
        • 延迟评估:线程获得相同的Lazy&lt;R&gt; 引用,因此只有一个人会实际评估它。同步字典:lock(mapSync)下不同线程如何为同一个key创建对象?
        • 不,多个线程实际上可能会评估惰性对象,因为它没有同步,但这不是重点。使对象变得惰性是没有意义的,因为它总是在下一行进行评估。在具有同步字典的版本中,您确定对象不在映射中的锁与您创建对象的锁之间存在间隙。在该间隙中,另一个线程可以确定它不在地图中并将其添加到地图中。这是一个很小的差距,但它就在那里。
        【解决方案6】:

        您不希望两次计算相同的值,并且希望多个线程能够同时计算值和/或检索值。为此,您需要使用某种条件变量和细粒度锁定系统。

        这就是想法。当没有值存在时,您将一个值放入同步映射中,然后任何需要该值的线程将等待它,否则您将只获取当前值。这样地图的锁定被最小化到查询值和返回值。

            public static Func<A, R> Memoize<A, R>(this Func<A, R> f)
            {
                var map = new Dictionary<A, R>();
                var mapSync = new Dictionary<A, object>();
                return a =>
                {
                    R value;
                    object sync = null;
                    bool calc = false;
                    bool wait = false;
                    lock (map)
                    {
                        if (!map.TryGetValue(a, out value))
                        {
                            //its not in the map
                            if (!mapSync.TryGetValue(a, out sync))
                            {
                                //not currently being created
                                sync = new object();
                                mapSync[a] = sync;
                                calc = true;
        
                            }
                            else
                            {
                                calc = false;
                                wait = true;
                            }
                        }
                    }
                    if(calc)
                    {
                        lock (sync)
                        {
                            value = f(a);
                            lock (map)
                            {
                                map.Add(a, value);
                                mapSync.Remove(a);
                            }
                            Monitor.PulseAll(sync);
                            return value;
                        }
                    }
                    else if (wait)
                    {
                        lock (sync)
                        {
                            while (!map.TryGetValue(a, out value))
                            {
                                Monitor.Wait(sync);
                            }
                            return value;
                        }
                    }
        
                    lock (map)
                    {
                        return map[a];
                    }
        
                };
            }
        

        这只是一个快速的第一次尝试,但我认为它展示了这项技术。在这里,您是以额外的内存换取速度。

        【讨论】:

          【解决方案7】:

          你读过文章中与线程安全相关的comment from Dyer吗?

          可能使 Memoize 线程安全的最简单方法是在地图上加锁。

          这将确保被记忆的函数对于每组不同的参数只运行一次。

          在我的 RoboRally 游戏示例中,我实际上使用函数记忆来充当“代理单例”。它并不是真正的单例,因为每个工厂实例可以有一个实例(除非工厂是静态的)。但这正是我想要的。

          【讨论】:

          • 是的,这是最简单的方法。我特别说明了它的坏处:它也阻止了我们同时根据不同的参数评估函数。
          猜你喜欢
          • 2013-12-30
          • 1970-01-01
          • 2011-10-16
          • 1970-01-01
          • 2020-09-19
          • 1970-01-01
          • 2013-10-24
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多