【发布时间】:2019-07-20 11:05:36
【问题描述】:
这是一个与缓存和异步函数有关的问题。为了给我的问题提供一些背景信息,我将解释一下为什么我会在缓存和异步函数方面遇到这个问题。
最近,我正在重构一段旧代码,旨在根据用户 ID 提供一些用户信息。为了修正这个想法,想象一下这样的签名:
public interface IUserInfoRetriever
{
UserInfo GetUserInfo(Guid userId);
}
public class UserInfoRetriever : IUserInfoRetriever
{
// implementation is discussed below
}
UserInfoRetriever 类有一个 ConcurrentDictionary<Guid, UserInfo> 的实例,用作缓存(其中用户 ID 是键)。
第一次调用GetUserInfo 方法时,会预先填充此内存缓存。当前采用的方法需要在每次调用GetUserInfo 时锁定,以检查缓存初始化是否已经完成。
锁被释放后,调用 ConcurrentDictionary 的方法GetOrAdd 以获取请求的用户信息。传递给 GetOrAdd 的第二个参数(在缓存未命中的情况下使用的值工厂)是一个 lambda 表达式,它调用能够提供用户信息的远程 Web 服务。此调用当前以阻塞方式(使用Task<T>.Result)完成,因为ConcurrentDictionary 不支持异步调用。
这里有一些元代码可以更好地理解场景:
public class UserInfoRetriever : IUserInfoRetriever
{
private readonly ConcurrentDictionary<Guid, UserInfo> userCache = new ConcurrentDictionary<Guid, UserInfo>();
private bool isCacheInitialized = false;
private readonly object locker = new object();
public UserInfo GetUserInfo(Guid userId)
{
this.InitializeCache();
return this.cache.GetOrAdd(userId, () => this.GetUserFromWebService(userId));
}
private void InitializeCache()
{
lock(this.locker)
{
if (this.isCacheInitialized)
{
return;
}
// read all the users available in the Users table of a database
// put each user in the cache by using ConcurrentDictionary<Guid, UserInfo>.TryAdd()
this.isCacheInitialized = true;
}
}
private UserInfo GetUserFromWebService(Guid userId)
{
// performs a call to a web service in a blocking fashion instead of using async methods of HttpClient class
// this is due to the signature of ConcurrentDictionary<TKey,TValue>.GetOrAdd not supporting async functions as the second parameter
}
}
这显然是一团糟。这个类做的太多了,可以使用框架类Lazy<T>替换方法InitializeCache,以摆脱锁和标志isCacheInitialized。
现在是时候结束介绍部分并转到真正的问题了。 此场景中的难点在于 ConcurrentDictionary 及其对 GetOrAdd 方法中的异步函数的支持。这就是我问题的重点。
据我所知,有四种可能的方法:
依赖阻塞调用,就像当前的实现一样。我不喜欢以阻塞方式进行 http 调用的想法。此外,不能保证提供给 ConcurrentDictionary.GetOrAdd 的回调只被调用一次。我的 Web 服务调用是 GET,因此它是幂等的,但在我看来这并不理想。
使用
Task<UserInfo>而不是UserInfo作为字典值。这个技巧对于解决缺乏对异步函数的支持很有用。然而,这并不能解决回调函数多次执行的问题(这是由于 ConcurrentDictionary 的实现方式造成的)使用 asp.net core memory cache 而不是 ConcurrentDictionary。这样做的好处是,GetOrCreateAsync 方法完全支持异步函数。不幸的是,不能保证只调用一次回调(有关更多信息,请参阅here)。
使用 LazyCache 的 nuget 包解决了这两个问题,因为它支持 GetOrAdd 缓存方法中的异步函数,并保证异步回调被调用一次。我不知道是否有办法永远缓存一个项目,但我可以使用一个非常长的绝对过期时间来模拟一个项目永不过期的 ConcurrentDictionary。
您对 ConcurrentDictionary 有什么建议?当用于提供缺失项的工厂方法是 async 时,除了我上面列出的方法之外,还有其他方法可以使用 ConcurrentDictionary 吗?
我的问题只涉及管理对 ConcurrentDictionary 的异步功能缺乏支持的问题。我正在寻找有关此的建议,因为有几种可能的方法可以解决它。我解释了整个重构场景,唯一的目的是为了更清楚并为我的问题提供一些上下文。
2019 年 7 月 24 日更新
对于感兴趣的人,这是我的最终实现:
public sealed class InMemoryUserCache : IUserCache, IDisposable
{
private readonly IBackEndUsersRepository _userRepository;
private readonly Lazy<ConcurrentDictionary<Guid, Task<BackEndUserInfo>>> _cache;
private readonly SemaphoreSlim _initializeCacheLocker;
public InMemoryUserCache(IBackEndUsersRepository userRepository)
{
_userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
_cache = new Lazy<ConcurrentDictionary<Guid, Task<BackEndUserInfo>>>(
InitializeCache,
LazyThreadSafetyMode.PublicationOnly);
_initializeCacheLocker = new SemaphoreSlim(2); // allows concurrency, but limit to 2 the number of threads that can concurrently initialize the lazy instance
}
public Task<BackEndUserInfo> GetOrAdd(
Guid userId,
Func<Guid, Task<BackEndUserInfo>> userInfoFactory)
{
if (userInfoFactory == null)
throw new ArgumentNullException(nameof(userInfoFactory));
return _cache.Value.GetOrAdd(userId, ToSafeUserInfoFactory(userInfoFactory));
}
private ConcurrentDictionary<Guid, Task<BackEndUserInfo>> InitializeCache()
{
_initializeCacheLocker.Wait();
try
{
var cache = new ConcurrentDictionary<Guid, Task<BackEndUserInfo>>();
foreach (var user in _userRepository.FindAll())
{
cache[user.Id] = Task.FromResult(user);
}
return cache;
}
finally
{
_initializeCacheLocker.Release();
}
}
private Func<Guid, Task<BackEndUserInfo>> ToSafeUserInfoFactory(
Func<Guid, Task<BackEndUserInfo>> userInfoFactory) =>
userId => TryExecuteUserInfoFactory(userInfoFactory, userId);
private async Task<BackEndUserInfo> TryExecuteUserInfoFactory(
Func<Guid, Task<BackEndUserInfo>> userInfoFactory,
Guid userId)
{
try
{
return await userInfoFactory(userId).ConfigureAwait(false);
}
catch (Exception)
{
_ = _cache.Value.TryRemove(userId, out var _);
throw;
}
}
public void Dispose()
{
_initializeCacheLocker?.Dispose();
}
}
【问题讨论】:
-
我的第一个问题是......为什么需要缓存中的所有用户?您甚至在寻找正确的东西吗
-
@AndreiDragotoniu 缓存初始化将本地数据库中所有可用的用户放入缓存中。他们不是所有的系统用户,而是过去已经执行过至少一项操作的所有用户。考虑到我不知道以前团队成员做出的历史决定(几年后我加入了该项目)
-
Furthermore, there is no guarantee that the callback provided to ConcurrentDictionary.GetOrAdd is called only once.对此的标准解决方案是GetOrAdd和Lazy。您可以对MemoryCache执行相同操作。 -
@mjwills 我读了几篇关于使用
Lazy<T>作为字典值的文章,以便您确定回调只被调用一次。惰性的主要问题是默认情况下它会缓存异常。 LazyCache 库的人也修复了这个问题,他们保证异常永远不会保存在缓存中 -
这可能会提供一些有用的信息ConcurrentDictionary GetOrAdd async
标签: c# multithreading .net-core async-await concurrentdictionary