【问题标题】:How to pass an async function to ConcurrentDictionary.GetOrAdd method?如何将异步函数传递给 ConcurrentDictionary.GetOrAdd 方法?
【发布时间】:2019-07-20 11:05:36
【问题描述】:

这是一个与缓存和异步函数有关的问题。为了给我的问题提供一些背景信息,我将解释一下为什么我会在缓存和异步函数方面遇到这个问题。

最近,我正在重构一段旧代码,旨在根据用户 ID 提供一些用户信息。为了修正这个想法,想象一下这样的签名:

public interface IUserInfoRetriever 
{
  UserInfo GetUserInfo(Guid userId);
}

public class UserInfoRetriever : IUserInfoRetriever 
{
   // implementation is discussed below
}

UserInfoRetriever 类有一个 ConcurrentDictionary<Guid, UserInfo> 的实例,用作缓存(其中用户 ID 是键)。

第一次调用GetUserInfo 方法时,会预先填充此内存缓存。当前采用的方法需要在每次调用GetUserInfo 时锁定,以检查缓存初始化是否已经完成。

锁被释放后,调用 ConcurrentDictionary 的方法GetOrAdd 以获取请求的用户信息。传递给 GetOrAdd 的第二个参数(在缓存未命中的情况下使用的值工厂)是一个 lambda 表达式,它调用能够提供用户信息的远程 Web 服务。此调用当前以阻塞方式(使用Task<T>.Result)完成,因为ConcurrentDictionary 不支持异步调用

这里有一些元代码可以更好地理解场景:

public class UserInfoRetriever : IUserInfoRetriever 
{
    private readonly ConcurrentDictionary<Guid, UserInfo> userCache = new ConcurrentDictionary<Guid, UserInfo>();
    private bool isCacheInitialized = false;
    private readonly object locker = new object();

    public UserInfo GetUserInfo(Guid userId) 
    {
        this.InitializeCache();

        return this.cache.GetOrAdd(userId, () => this.GetUserFromWebService(userId));
    }

    private void InitializeCache()
    {
        lock(this.locker)
        {
            if (this.isCacheInitialized)
            {
                return;
            }

            // read all the users available in the Users table of a database
            // put each user in the cache by using ConcurrentDictionary<Guid, UserInfo>.TryAdd()

            this.isCacheInitialized = true;
        }
    }

    private UserInfo GetUserFromWebService(Guid userId) 
    {
        // performs a call to a web service in a blocking fashion instead of using async methods of HttpClient class
        // this is due to the signature of ConcurrentDictionary<TKey,TValue>.GetOrAdd not supporting async functions as the second parameter
    }
}

这显然是一团糟。这个类做的太多了,可以使用框架类Lazy&lt;T&gt;替换方法InitializeCache,以摆脱锁和标志isCacheInitialized

现在是时候结束介绍部分并转到真正的问题了。 此场景中的难点在于 ConcurrentDictionary 及其对 GetOrAdd 方法中的异步函数的支持。这就是我问题的重点。

据我所知,有四种可能的方法:

  • 依赖阻塞调用,就像当前的实现一样。我不喜欢以阻塞方式进行 http 调用的想法。此外,不能保证提供给 ConcurrentDictionary.GetOrAdd 的回调只被调用一次。我的 Web 服务调用是 GET,因此它是幂等的,但在我看来这并不理想。

  • 使用Task&lt;UserInfo&gt; 而不是UserInfo 作为字典值。这个技巧对于解决缺乏对异步函数的支持很有用。然而,这并不能解决回调函数多次执行的问题(这是由于 ConcurrentDictionary 的实现方式造成的)

  • 使用 asp.net core memory cache 而不是 ConcurrentDictionary。这样做的好处是,GetOrCreateAsync 方法完全支持异步函数。不幸的是,不能保证只调用一次回调(有关更多信息,请参阅here)。

  • 使用 LazyCache 的 nuget 包解决了这两个问题,因为它支持 GetOrAdd 缓存方法中的异步函数,并保证异步回调被调用一次。我不知道是否有办法永远缓存一个项目,但我可以使用一个非常长的绝对过期时间来模拟一个项目永不过期的 ConcurrentDictionary。

您对 ConcurrentDictionary 有什么建议?当用于提供缺失项的工厂方法是 async 时,除了我上面列出的方法之外,还有其他方法可以使用 ConcurrentDictionary 吗?

我的问题只涉及管理对 ConcurrentDictionary 的异步功能缺乏支持的问题。我正在寻找有关此的建议,因为有几种可能的方法可以解决它。我解释了整个重构场景,唯一的目的是为了更清楚并为我的问题提供一些上下文。

2019 年 7 月 24 日更新

对于感兴趣的人,这是我的最终实现:

public sealed class InMemoryUserCache : IUserCache, IDisposable
  {
    private readonly IBackEndUsersRepository _userRepository;
    private readonly Lazy<ConcurrentDictionary<Guid, Task<BackEndUserInfo>>> _cache;
    private readonly SemaphoreSlim _initializeCacheLocker;

    public InMemoryUserCache(IBackEndUsersRepository userRepository)
    {
      _userRepository = userRepository ?? throw new ArgumentNullException(nameof(userRepository));
      _cache = new Lazy<ConcurrentDictionary<Guid, Task<BackEndUserInfo>>>(
        InitializeCache,
        LazyThreadSafetyMode.PublicationOnly);
      _initializeCacheLocker = new SemaphoreSlim(2); // allows concurrency, but limit to 2 the number of threads that can concurrently initialize the lazy instance
    }

    public Task<BackEndUserInfo> GetOrAdd(
      Guid userId,
      Func<Guid, Task<BackEndUserInfo>> userInfoFactory)
    {
      if (userInfoFactory == null)
        throw new ArgumentNullException(nameof(userInfoFactory));

      return _cache.Value.GetOrAdd(userId, ToSafeUserInfoFactory(userInfoFactory));
    }

    private ConcurrentDictionary<Guid, Task<BackEndUserInfo>> InitializeCache()
    {
      _initializeCacheLocker.Wait();

      try
      {
        var cache = new ConcurrentDictionary<Guid, Task<BackEndUserInfo>>();

        foreach (var user in _userRepository.FindAll())
        {
          cache[user.Id] = Task.FromResult(user);
        }

        return cache;
      }
      finally
      {
        _initializeCacheLocker.Release();
      }
    }

    private Func<Guid, Task<BackEndUserInfo>> ToSafeUserInfoFactory(
      Func<Guid, Task<BackEndUserInfo>> userInfoFactory) =>
        userId => TryExecuteUserInfoFactory(userInfoFactory, userId);

    private async Task<BackEndUserInfo> TryExecuteUserInfoFactory(
      Func<Guid, Task<BackEndUserInfo>> userInfoFactory,
      Guid userId)
    {
      try
      {
        return await userInfoFactory(userId).ConfigureAwait(false);
      }
      catch (Exception)
      {
        _ = _cache.Value.TryRemove(userId, out var _);
        throw;
      }
    }

    public void Dispose()
    {
      _initializeCacheLocker?.Dispose();
    }
  }

【问题讨论】:

  • 我的第一个问题是......为什么需要缓存中的所有用户?您甚至在寻找正确的东西吗
  • @AndreiDragotoniu 缓存初始化将本地数据库中所有可用的用户放入缓存中。他们不是所有的系统用户,而是过去已经执行过至少一项操作的所有用户。考虑到我不知道以前团队成员做出的历史决定(几年后我加入了该项目)
  • Furthermore, there is no guarantee that the callback provided to ConcurrentDictionary.GetOrAdd is called only once. 对此的标准解决方案是GetOrAddLazy。您可以对 MemoryCache 执行相同操作。
  • @mjwills 我读了几篇关于使用Lazy&lt;T&gt; 作为字典值的文章,以便您确定回调只被调用一次。惰性的主要问题是默认情况下它会缓存异常。 LazyCache 库的人也修复了这个问题,他们保证异常永远不会保存在缓存中
  • 这可能会提供一些有用的信息ConcurrentDictionary GetOrAdd async

标签: c# multithreading .net-core async-await concurrentdictionary


【解决方案1】:

GetOrAdd 中支持异步回调似乎是个坏主意。显然,回调正在进行大量的工作或状态更改,或者您已经使用了提前准备新项目的版本......但是,这仍然是对我们的并发集合的同步访问'正在谈论。

如果在回调完成之前访问了 ConcurrentDictionary 的相同元素会发生什么?使用线程,您最多可以在每个线程的 ConcurrentDictionary 上进行一个操作,除非您对重入做了一些非常疯狂的事情。使用 Tasks 和 continuation-passing-style,您最终可能会得到无限制的重叠字典请求,甚至无限制的递归,甚至无需尝试。

考虑如果在第一次完成之前对同一用户进行第二次缓存查找会发生什么。在您的情况下,两个查找都应该从一个网络请求中返回数据。要实现这一点,请将您的 ConcurrentDictionary&lt;UserId, User&gt; 更改为 ConcurrentDictionary&lt;UserId, Task&lt;User&gt;&gt; 并继续传递您的 async 函数作为回调。当异步代码阻塞时,它的Task 将在Dictionary 中注册,而不是在它完成时......这允许后续查找await 相同的已经在运行的Task。这是您的第二个项目符号,但我不确定您为什么得出结论认为它不能很好地处理多个查找。它不会 100% 防止重复查找导致独立的网络请求,但竞争窗口比您当前的实现要小得多。

图书馆行为的设计应该使正确的操作变得容易,并且直接的操作正确(有时称为“成功的坑”)。 GetOrAddAsync 将使逻辑推理变得相当复杂,这与提供通用的预构建线程安全集合类背道而驰。

如果您需要对共享集合进行长时间运行的元素初始化,那么通用的集合不是一个好的选择,并且您需要构建一个针对您的特定用例进行优化的线程安全集合.

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-03
    • 2015-12-01
    • 1970-01-01
    • 2021-12-29
    • 2018-11-09
    相关资源
    最近更新 更多