【问题标题】:Async threadsafe Get from MemoryCache从 MemoryCache 获取异步线程安全
【发布时间】:2015-08-05 11:57:23
【问题描述】:

我创建了一个使用 .NET MemoryCache 的异步缓存。 这是代码:

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if(!_cache.Contains(key))
    {
        var data = await populator();
        lock(_cache)
        {
            if(!_cache.Contains(key)) //Check again but locked this time
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
        }
    }

    return (T)_cache.Get(key);
}

我认为唯一的缺点是我需要在锁外进行等待,因此填充器不是线程安全的,但由于等待不能驻留在锁内,我想这是最好的方法。有没有我遗漏的陷阱?

更新:当另一个线程使缓存无效时,它也是线程安全的 Esers 答案版本:

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = new Lazy<Task<T>>(populator, true);
    _cache.AddOrGetExisting(key, lazy, DateTimeOffset.Now.Add(expire));
    return ((Lazy<Task<T>>) _cache.Get(key)).Value;
}

但是它可能会更慢,因为它会创建永远不会执行的 Lazy 实例,并且它在完全线程安全模式下使用 Lazy LazyThreadSafetyMode.ExecutionAndPublication

更新为新的基准(越高越好)

Lazy with lock      42535929
Lazy with GetOrAdd  41070320 (Only solution that is completely thread safe)
Semaphore           64573360

【问题讨论】:

  • 假设一个新线程带有相同的键,而第一个线程等待填充。 populator 将不必要地执行两次。
  • 你可以使用计数为1的SempahoreSlim,它有异步等待msdn.microsoft.com/en-us/library/hh462805(v=vs.110).aspx
  • 是的,这是我知道的缺点,但由于 await 无法锁定,因此很难构建?
  • @ned,很好,会看的
  • 据我所知,MemoryCache 线程是安全的。还是我错过了什么?

标签: c# thread-safety async-await


【解决方案1】:

一个简单的解决方案是使用SemaphoreSlim.WaitAsync() 而不是锁,然后您可以解决在锁内等待的问题。虽然,MemoryCache 的所有其他方法都是线程安全的。

private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
            string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if (!_cache.Contains(key))
    {
        await semaphoreSlim.WaitAsync();
        try
        {
            if (!_cache.Contains(key))
            {
                var data = await populator();
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
            }
        }
        finally
        {
            semaphoreSlim.Release();
        }
    }

    return (T)_cache.Get(key);
}

【讨论】:

  • 是的,但是包含键检查然后插入缓存不是
  • 如果在您检查!_cache.Contains(key) 之后但在您检查return (T)_cache.Get 之前,由于过期策略,该项目从缓存中删除了怎么办?前者应该是AddOrGetExisting。实际上,您对内部 _cache.Contains(key) 也有同样的问题。
  • @OhadSchneider 你是对的。我的回答的主要重点不是重新定义 OP 调用的语义,而是展示如何使用 SemaphoreSlim 与异步调用结合使用。我将重申解决过期问题的代码。
  • 这不是解决方案。每次调用缓存时,它可能必须等待其他调用。如果您尝试缓存多条数据怎么办?您可能会发现一件事需要 100 毫秒,但它总是不得不等待需要 2-3 秒以上的事情。
  • 这不是一个解决方案,因为它为所有缓存共享一个锁。您至少需要每个键的信号量(即保留键字典)才能完成这项工作。其他人在这里发布了解决方案:stackoverflow.com/a/65643540/1878141
【解决方案2】:

当前答案使用有些过时的System.Runtime.Caching.MemoryCache。它们还包含微妙的竞争条件(参见 cmets)。最后,并非所有这些都允许超时取决于要缓存的值。

这是我尝试使用新的Microsoft.Extensions.Caching.Memory(由 ASP.NET Core 使用):

//Add NuGet package: Microsoft.Extensions.Caching.Memory    

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;

MemoryCache _cache = new MemoryCache(new MemoryCacheOptions());

public Task<T> GetOrAddAsync<T>(
        string key, Func<Task<T>> factory, Func<T, TimeSpan> expirationCalculator)
{    
    return _cache.GetOrCreateAsync(key, async cacheEntry => 
    {
        var cts = new CancellationTokenSource();
        cacheEntry.AddExpirationToken(new CancellationChangeToken(cts.Token));
        var value = await factory().ConfigureAwait(false);
        cts.CancelAfter(expirationCalculator(value));
        return value;
    });
}

示例用法:

await GetOrAddAsync("foo", () => Task.Run(() => 42), i  => TimeSpan.FromMilliseconds(i)));

请注意,不能保证只调用一次工厂方法(请参阅https://github.com/aspnet/Caching/issues/240)。

【讨论】:

    【解决方案3】:

    虽然有一个已经接受的答案,但我会用Lazy&lt;T&gt; 方法发布一个新答案。想法是:尽量减少lock 块的持续时间,如果缓存中不存在密钥,则将Lazy&lt;T&gt; 放入缓存中。这样所有线程同时使用相同的键将等待相同的Lazy&lt;T&gt; 的值

    public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
    {
        if (parameters != null)
            key += JsonConvert.SerializeObject(parameters);
    
        lock (_cache)
        {
            if (!_cache.Contains(key))
            {
                var lazy = new Lazy<Task<T>>(populator, true);
                _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
            }
        }
    
        return ((Lazy<Task<T>>)_cache.Get(key)).Value;
    }
    

    版本 2

    public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
    {
        if (parameters != null)
            key += JsonConvert.SerializeObject(parameters);
    
        var lazy = ((Lazy<Task<T>>)_cache.Get(key));
        if (lazy != null) return lazy.Value;
    
        lock (_cache)
        {
            if (!_cache.Contains(key))
            {
                lazy = new Lazy<Task<T>>(populator, true);
                _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
                return lazy.Value;
            }
            return ((Lazy<Task<T>>)_cache.Get(key)).Value;
        }
    }
    

    版本 3

    public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
    {
        if (parameters != null)
            key += JsonConvert.SerializeObject(parameters);
    
        var task = (Task<T>)_cache.Get(key);
        if (task != null) return task;
    
        var value = populator();
        return 
         (Task<T>)_cache.AddOrGetExisting(key, value, DateTimeOffset.Now.Add(expire)) ?? value;
    }
    

    【讨论】:

    • 使用您的新模式,您可以通过删除Contains 检查并将Add 调用切换到AddOrGetExisting 来完全摆脱锁定。
    • 加上 Scotts 后,在删除条目时它的线程安全(不是我的也不是 Yuvals)
    • @Anders 我在 Scott 的评论之后尝试了它,但它需要创建不必要的 Lazy(尽管它们没有被评估)。所以我没有发。
    • @Anders 也不需要 async/await :)
    • @Mike 谢谢,我假设AddOrGetExistingConcurrentDictionary 中的GetOrAdd 具有相同的合同(这在IMO 中更有意义)。无论如何,您提出的修复包含一个竞争条件(想象一下添加了该值,然后在您到达 _cache.Get 之前立即将其删除)所以我修复它略有不同。
    【解决方案4】:

    这是对 Eser 的 answer(版本 2)的尝试改进。 Lazy 类默认是线程安全的,所以lock 可以被删除。可能会为给定键创建多个Lazy 对象,但只有一个对象会查询它的Value 属性,从而导致重Task 的开始。其他 Lazys 将保持未使用状态,并且将超出范围并很快被垃圾回收。

    第一个重载是灵活和通用的,并接受Func&lt;CacheItemPolicy&gt; 参数。对于最常见的绝对到期和滑动到期情况,我添加了另外两个重载。为了方便,可以添加更多的重载。

    using System.Runtime.Caching;
    
    static partial class MemoryCacheExtensions
    {
        public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
            Func<Task<T>> valueFactory, Func<CacheItemPolicy> cacheItemPolicyFactory = null)
        {
            var lazyTask = (Lazy<Task<T>>)cache.Get(key);
            if (lazyTask == null)
            {
                var newLazyTask = new Lazy<Task<T>>(valueFactory);
                var cacheItem = new CacheItem(key, newLazyTask);
                var cacheItemPolicy = cacheItemPolicyFactory?.Invoke();
                var existingCacheItem = cache.AddOrGetExisting(cacheItem, cacheItemPolicy);
                lazyTask = (Lazy<Task<T>>)existingCacheItem?.Value ?? newLazyTask;
            }
            return ToAsyncConditional(lazyTask.Value);
        }
    
        private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
        {
            if (task.IsCompleted) return task;
            return task.ContinueWith(t => t,
                default, TaskContinuationOptions.RunContinuationsAsynchronously,
                TaskScheduler.Default).Unwrap();
        }
    
        public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
            Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
        {
            return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
            {
                AbsoluteExpiration = absoluteExpiration,
            });
        }
    
        public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
            Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
        {
            return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
            {
                SlidingExpiration = slidingExpiration,
            });
        }
    }
    

    使用示例:

    string html = await MemoryCache.Default.GetOrCreateLazyAsync("MyKey", async () =>
    {
        return await new WebClient().DownloadStringTaskAsync("https://stackoverflow.com");
    }, DateTimeOffset.Now.AddMinutes(10));
    

    此站点的 HTML 已下载并缓存 10 分钟。多个并发请求会await完成同一个任务。

    System.Runtime.Caching.MemoryCache 类易于使用,但对缓存条目优先级的支持有限。基本上只有two optionsDefaultNotRemovable,这意味着它对于高级场景来说是不够的。较新的Microsoft.Extensions.Caching.Memory.MemoryCache 类(来自this 包)提供了关于缓存优先级的more optionsLowNormalHighNeverRemove),但在其他方面不太直观且使用起来更麻烦。它提供异步功能,但不是懒惰的。所以这里是这个类的 LazyAsync 等效扩展:

    using Microsoft.Extensions.Caching.Memory;
    
    static partial class MemoryCacheExtensions
    {
        public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
            Func<Task<T>> valueFactory, MemoryCacheEntryOptions options = null)
        {
            if (!cache.TryGetValue(key, out Lazy<Task<T>> lazy))
            {
                var entry = cache.CreateEntry(key);
                if (options != null) entry.SetOptions(options);
                var newLazy = new Lazy<Task<T>>(valueFactory);
                entry.Value = newLazy;
                entry.Dispose(); // Dispose actually inserts the entry in the cache
                if (!cache.TryGetValue(key, out lazy)) lazy = newLazy;
            }
            return ToAsyncConditional(lazy.Value);
        }
    
        private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
        {
            if (task.IsCompleted) return task;
            return task.ContinueWith(t => t,
                default, TaskContinuationOptions.RunContinuationsAsynchronously,
                TaskScheduler.Default).Unwrap();
        }
    
        public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
            Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
        {
            return cache.GetOrCreateLazyAsync(key, valueFactory,
                new MemoryCacheEntryOptions() { AbsoluteExpiration = absoluteExpiration });
        }
    
        public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
            Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
        {
            return cache.GetOrCreateLazyAsync(key, valueFactory,
                new MemoryCacheEntryOptions() { SlidingExpiration = slidingExpiration });
        }
    }
    

    使用示例:

    var cache = new MemoryCache(new MemoryCacheOptions());
    string html = await cache.GetOrCreateLazyAsync("MyKey", async () =>
    {
        return await new WebClient().DownloadStringTaskAsync("https://stackoverflow.com");
    }, DateTimeOffset.Now.AddMinutes(10));
    

    更新:我刚刚意识到peculiar featureasync-await 机制。当一个不完整的Task 被同时等待多次时,延续将一个接一个地同步运行(在同一个线程中)(假设没有同步上下文)。这对于GetOrCreateLazyAsync 的上述实现可能是一个问题,因为在等待调用GetOrCreateLazyAsync 之后可能会立即存在阻塞代码,在这种情况下,其他等待者将受到影响(延迟,甚至死锁)。此问题的一个可能解决方案是返回延迟创建的Task 的异步延续,而不是任务本身,但前提是任务不完整。这就是上面引入ToAsyncConditional方法的原因。


    注意:此实现缓存异步 lambda 调用期间可能发生的任何错误。一般来说,这可能不是一个理想的行为。 我可能的解决方案是将 Lazy&lt;Task&lt;T&gt;&gt; 替换为 Stephen Cleary 的 Nito.AsyncEx.Coordination 包中的 AsyncLazy&lt;T&gt; 类型,并使用 RetryOnFailure 选项进行实例化。

    【讨论】:

    • 我在我的场景中对此进行了测试,它确实有效。
    • @ChristianFindlay 我知道的这个答案的唯一问题是,如果出现异常(在异步委托中),错误会被缓存。这可能不是一个理想的行为。
    • @ChristianFindlay 现在我再次检查它,我的答案的第二部分(与Microsoft.Extensions.Caching.Memory 相关的部分)有缺陷。通过异步 lambda 传递的选项将被忽略(absoluteExpirationslidingExpiration)。我可能不得不重新考虑我的方法。
    • @ChristianFindlay 我修复了 Microsoft.Extensions.Caching.Memory 相关实现的两个问题。缓存失败任务的行为问题仍然存在。
    • 回想起来,我不再对Lazy&lt;Task&lt;T&gt;&gt; 组合感到满意。它是阻塞和异步组件的危险组合,在某些情况下可能会导致线程被阻塞并失去可伸缩性。目前我赞成使用嵌套任务 (Task&lt;Task&gt;) 实现相同的功能,如 here 所示。
    【解决方案5】:

    这个帖子有点老了,但我最近遇到了这个问题,我想我会留下这个答案希望它有所帮助。

    对于异步,有几点需要牢记:

    1. “超级锁定”方法并不快,因为它会在对键执行操作时阻止对其他键的工厂操作。
    2. “每把钥匙锁”(SemaphoreSlim) 发生了两件事:它是一次性的,因此可以在比赛之后将其丢弃。湾。不要丢弃它。

    我选择使用锁池来解决它。不需要每个键都有一个锁,但只要有足够的锁作为可能的最大活动线程即可。然后我通过散列将相同的锁分配给密钥。池大小是ProcessorCount 的函数。 valueFactory 只执行一次。由于多个键映射到一个锁(一个键总是映射到同一个锁),具有相同散列的键的操作将得到同步。所以这失去了一些并行性,这种妥协可能不适用于所有情况。我同意这种妥协。这是LazyCacheFusionCache(其众多方法之一)使用的方法,等等。所以我会使用其中的一个,但很高兴知道这个技巧,因为它非常漂亮。

    private readonly SemaphoreSlimPool _lockPool = new SemaphoreSlimPool(1, 1);
    
    private async Task<TValue> GetAsync(object key, Func<ICacheEntry, Task<TValue>> valueFactory)
    {
        if (_cache.TryGetValue(key, out var value))
        {
            return value;
        }
    
        // key-specific lock so as to not block operations on other keys
        var lockForKey = _lockPool[key];
        await lockForKey.WaitAsync().ConfigureAwait(false);
        try
        {
            if (_cache.TryGetValue(key, out value))
            {
                return value;
            }
    
            value = await _cache.GetOrCreateAsync(key, valueFactory).ConfigureAwait(false);
            return value;
        }
        finally
        {
            lockForKey.Release();
        }
    }
    
    // Dispose SemaphoreSlimPool
    

    这是SemaphoreSlimPool impl (source, nuget)。

    /// <summary>
    /// Provides a pool of SemaphoreSlim objects for keyed usage.
    /// </summary>
    public class SemaphoreSlimPool : IDisposable
    {
        /// <summary>
        /// Pool of SemaphoreSlim objects.
        /// </summary>
        private readonly SemaphoreSlim[] pool;
    
        /// <summary>
        /// Size of the pool.
        /// <para></para>
        /// Environment.ProcessorCount is not always correct so use more slots as buffer,
        /// with a minimum of 32 slots.
        /// </summary>
        private readonly int poolSize = Math.Max(Environment.ProcessorCount << 3, 32);
    
        private const int NoMaximum = int.MaxValue;
    
        /// <summary>
        /// Ctor.
        /// </summary>
        public SemaphoreSlimPool(int initialCount)
            : this(initialCount, NoMaximum)
        { }
    
        /// <summary>
        /// Ctor.
        /// </summary>
        public SemaphoreSlimPool(int initialCount, int maxCount)
        {
            pool = new SemaphoreSlim[poolSize];
            for (int i = 0; i < poolSize; i++)
            {
                pool[i] = new SemaphoreSlim(initialCount, maxCount);
            }
        }
    
        /// <inheritdoc cref="Get(object)" />
        public SemaphoreSlim this[object key] => Get(key);
    
        /// <summary>
        /// Returns a <see cref="SemaphoreSlim"/> from the pool that the <paramref name="key"/> maps to.
        /// </summary>
        /// <exception cref="ArgumentNullException"></exception>
        public SemaphoreSlim Get(object key)
        {
            _ = key ?? throw new ArgumentNullException(nameof(key));
            return pool[GetIndex(key)];
        }
    
        private uint GetIndex(object key)
        {
            return unchecked((uint)key.GetHashCode()) % (uint)poolSize;
        }
    
        private bool disposed = false;
    
        public void Dispose()
        {
            Dispose(true);
        }
    
        public void Dispose(bool disposing)
        {
            if (!disposed)
            {
                if (disposing)
                {
                    if (pool != null)
                    {
                        for (int i = 0; i < poolSize; i++)
                        {
                            pool[i].Dispose();
                        }
                    }
                }
    
                disposed = true;
            }
        }
    }
    

    由于 ttl 低,我已经在这个问题上抛出了很多线程,而且它没有被轰炸。到目前为止,它对我来说看起来不错,但我想看看是否有人能找到错误。

    【讨论】:

    • 我不喜欢这个解决方案。 MemoryCache 中存储的条目数量与处理器数量无关。 MemoryCache 可以存储数千个密钥,在具有 4 个内核的机器中。该解决方案的作用是对可能具有相同key.GetHashCode() % poolSize 值的所有不同键使用相同的同步原语(SemaphoreSlim)。因此GetAsync("Light") 可能必须等待不相关的GetAsync("Heavy") 完成,以防“Light”和“Heavy”键碰巧具有相同的哈希码模 32。
    • 另外,valueFactory 可能不会在当前上下文中被调用,因为 ConfigureAwait(false),会产生未知的后果。
    • 是的,这是一个适合我的折衷方案。缓存通常不是写繁重的。到目前为止,我看到的所有其他解决方案要么与 valueFactory 竞争,要么不处理,这更糟糕。
    • 消费代码不能使用ConfigureAwait(false)。就我而言,我需要它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-16
    • 2021-11-06
    • 2013-12-07
    • 1970-01-01
    • 2017-05-06
    相关资源
    最近更新 更多