【问题标题】:ConcurrentDictionary GetOrAdd asyncConcurrentDictionary GetOrAdd 异步
【发布时间】:2019-06-04 16:07:13
【问题描述】:

我想使用GetOrAddConcurrentDictionary 之类的东西作为Web 服务的缓存。这本词典有异步版本吗? GetOrAdd 将使用 HttpClient 发出 Web 请求,所以如果此字典有一个版本,其中 GetOrAdd 是异步的,那就太好了。

为了消除一些混乱,字典的内容将是对网络服务的调用的响应。

ConcurrentDictionary<string, Response> _cache
    = new ConcurrentDictionary<string, Response>();

var response = _cache.GetOrAdd("id",
    (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse(); });

【问题讨论】:

  • 对我来说,async GetOrAdd 听起来没有多大意义。该方法只能同步执行。
  • 添加到字典不是 IO 绑定操作,它的异步版本没有意义。
  • 如果您需要等待某些东西,我建议检查密钥是否在字典中,如果没有,则等待 Http 调用,然后调用 GeOrAdd 并返回结果。最终,您必须再次检查,以防在您等待 IO 时有其他东西插入了密钥。
  • @juharr:这正是ConcurrentDictionary 所做的。它首先检查,然后生成一个新值,然后在添加之前再次检查。

标签: c# .net asynchronous concurrency concurrentdictionary


【解决方案1】:

GetOrAdd 不会成为异步操作,因为访问字典的值不是长时间运行的操作。

但是,您可以做的只是将任务存储在字典中,而不是物化结果。任何需要结果的人都可以等待该任务。

但是,您还需要确保该操作只启动一次,而不是多次。为了保证某些操作只运行一次,而不是多次运行,还需要添加Lazy

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;

【讨论】:

  • 这会将不完整的Task 放入缓存中。如果Task 发生故障或被取消会怎样?该任务代表对远程资源的 HTTP 请求,失败的可能性不可忽略。
  • @odyss-jii 是的,他们需要处理错误情况,这很可能涉及将其从缓存中删除。
  • 这对于缓存来说绝对是可怕的设计。它完全打破了抽象。如果我从子系统中获取值,则清理其内部缓存不是我的责任,因为它的实现已损坏。
  • 它不需要是处理它的缓存的最终消费者,它可以是 OP 编写的代码的包装器。此答案中的代码不是完整的生产就绪全功能缓存。它展示了如何解决所提出的问题,OP 需要在自己的包装缓存中完成这些问题,以使其成为有价值的代码。就像您的答案有问题,使其无法完成生产就绪代码,而只是所问问题的解决方案。
  • @Darragh 但是您不止一次执行该操作。这通常是不可接受的。 Lazy 不确保操作返回更快,它确保它不会运行超过一次。
【解决方案2】:

GetOrAdd 方法不太适合用于此目的。由于它不能保证工厂只运行一次,它的唯一目的是进行次要优化(次要,因为无论如何添加都很少),因为它不需要哈希并找到正确的存储桶两次(如果你得到和设置两个单独的调用)。

我建议你先检查缓存,如果在缓存中没有找到值,然后进入某种形式的临界区(锁、信号量等),重新检查缓存,如果仍然丢失则获取值并插入缓存。

这可确保您的后备存储只被命中一次;即使多个请求同时发生缓存未命中,也只有第一个会真正获取值,其他请求将等待信号量然后提前返回,因为它们会重新检查临界区的缓存。

伪代码(使用计数为 1 的 SemaphoreSlim,因为您可以异步等待它):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}

【讨论】:

  • 如果您要显式锁定,那么您还需要显式锁定使用此集合的所有其他位置,以确保操作在逻辑上是原子的。
  • 集合是ConcurrentDictionary,集合本身是线程安全的。您在这里锁定的原因不同。
  • 该集合不会抛出某种索引越界异常或返回垃圾数据,因为它被设计为从多个线程中使用,但您现在正试图从中执行多个操作序列,并且在此期间依赖于对集合的任何更改,它不会为您提供。您不仅需要显式锁定此处,还需要使用集合在任何地方进行显式锁定,以确保其他人在您发现该值丢失或类似情况后不会添加该值。
  • 我不确定您在考虑哪种假设情况,但它不适用于这种特殊情况。这是从具有内存缓存的源中获取的,在此期间集合是否更改并不重要。锁的目的是在有多个并发缓存未命中的情况下保护源免受浪涌;目的不是同步对集合的访问。
  • 是的,如果集合发生变化确实很重要。例如,它可能会导致多次执行不应重复的工作。
【解决方案3】:

试试这个扩展方法:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}

您使用await dict.GetOrAddAsync(key,async key=&gt;await something(key)),而不是dict.GetOrAdd(key,key=&gt;something(key))。显然,在这种情况下,您只需将其写为await dict.GetOrAddAsync(key,something),但我想说清楚。

关于保持操作顺序的问题,我有以下观察:

  1. 如果您查看它的实现方式,使用普通的 GetOrAdd 将获得相同的效果。我确实使用了相同的代码并使其适用于异步。参考资料说

valueFactory 委托在锁之外被调用以避免 在锁定下执行未知代码可能引起的问题。 因此,GetOrAdd 对于所有其他操作都不是原子的 在 ConcurrentDictionary

  1. ConcurrentDictionary 不支持 SyncRoot,它们使用内部锁定机制,因此无法对其进行锁定。但是,使用您自己的锁定机制仅适用于此扩展方法。如果您使用其他流程(例如使用 GetOrAdd),您将面临同样的问题。

【讨论】:

  • GetOrAddAsync 的此实现不保留操作顺序。场景:Workflow-1 调用.GetOrAddAsync("Key", GetAsync("A")),然后Workflow-2 调用.GetOrAddAsync("Key", GetAsync("B")),然后Workflow-3 调用.TryRemove("Key", out _)。最后,字典可能最终具有值“A”或“B”,或者根本没有值。发生这种情况是因为 GetOrAddAsync 实现推迟将任何内容存储到字典中,直到异步委托完成。
  • 你的意思是如果你不等待就使用它?
  • Siderite 不,我的意思是如果你正确地等待方法。我的场景涉及三个独立的异步工作流,其中每个工作流调用 API 和 awaits 返回的任务。在这种情况下,Workflow-1 等待的任务可能需要比 Workflow-2 等待的任务更长的时间才能完成,在这种情况下,Workflow-1 将覆盖 Workflow-2 在字典中输入的值。这种行为至少可以说是令人惊讶的。
  • 我已经更新了我的答案。我很欣赏您在实施中的关注程度,但我认为由于我在答案中列出的原因,它过于复杂。 SO 问题是关于将 GetOrAdd 与异步委托一起使用,这意味着接受原始方法的限制。
  • Siderite 我明白你的意思。您的实现确实具有与本机 GetOrAdd (source code) 类似的行为。我猜典型的异步工作比典型的同步工作的持续时间更长,这可能会使原生行为的缺点更加突出。无论如何,我的反对票是毫无根据的,我撤销了它。
【解决方案4】:

可能使用专用内存缓存(如newold MemoryCache 类,或this 第三方库)应该比使用简单的ConcurrentDictionary 更好。除非你真的不需要常用的功能,例如基于时间的过期、基于大小的压缩、依赖于其他已过期条目或依赖于可变外部资源(如文件、数据库等)的条目的自动驱逐。应该注意的是,MemoryCache 可能仍需要一些工作才能正确处理异步委托,因为它的开箱即用行为is not ideal

下面是具有Task&lt;TValue&gt; 值的ConcurrentDictionarys 的自定义扩展方法GetOrAddAsync。它接受一个工厂方法,并确保该方法最多被调用一次。它还确保从字典中删除失败的任务。此实现针对频繁获取现有任务而很少发生创建新任务的情况进行了优化。

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        var newTaskTask = new Task<Task<TValue>>(async () =>
        {
            try { return await valueFactory(key).ConfigureAwait(false); }
            catch
            {
                ((ICollection<KeyValuePair<TKey, Task<TValue>>>)source)
                    .Remove(new KeyValuePair<TKey, Task<TValue>>(key, newTask));
                //source.TryRemove(KeyValuePair.Create(key, newTask)); // .NET 5
                throw;
            }
        });
        newTask = newTaskTask.Unwrap();
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask)
            newTaskTask.RunSynchronously(TaskScheduler.Default);
    }
    return currentTask;
}

使用示例:

var cache = new ConcurrentDictionary<string, Task<HttpResponseMessage>>();

var response = await cache.GetOrAddAsync("https://stackoverflow.com", async url =>
{
    return await _httpClient.GetAsync(url);
});

为了移除故障任务,此实现使用显式实现的ICollection&lt;T&gt;.Remove API。有关此 API 的更多信息,请访问 here。从 .NET 5 开始,可以改用新的 TryRemove(KeyValuePair&lt;TKey, TValue&gt; item) 方法。

顺便说一句,如果需要最高性能,您可能需要查看BitFaster.Caching 第三方库。我从未亲自使用过它,但带有基准的图表看起来令人印象深刻。

【讨论】:

  • 我对您的解决方案有一些疑问。 Unwrapawait newTaskTask 一样吗?为什么不直接使用source.TryRemove(key, out _) 而不是将source 转换为ICollection&lt;&gt;?我对newTaskTask.RunSynchronously(TaskScheduler.Default) 有点困惑。这看起来有点奇怪。我们需要这个吗?调用者将执行await。这会确保任务被调用吗?
  • 问题是:this implementation 或多或少等同于您的示例?是的,我知道执行被“推迟”(不是真的),直到第一个调用者等待返回的任务。
  • @SebastianSchumann 当然,感谢您的提问。 newTaskTask.Unwrap() 确实与await newTaskTask 相同,前提是该任务很热(即它已经开始)。这不是这里的情况,如果我们尝试await 任务将会出现死锁。任务是故意冷的,因为我们只想在任务成功插入字典后启动任务。否则,如果更新字典的竞赛失败了,冷任务将被丢弃。
  • source.TryRemove(key, out _)不够用的原因是GetOrAddAsync只是一个扩展方法,它并不能完全控制字典的内容。所以有可能在任务运行时,一些其他代码可能会用另一个任务替换该任务。万一我们的任务失败,我们只想在它仍然存在的情况下从字典中删除它,而不是删除我们不知道的其他任务。
  • newTaskTask.RunSynchronously(TaskScheduler.Default) 启动外部任务,该任务调用valueFactory 委托。在达到这一点之前,valueFactory 尚未被调用。 valueFactory 必须只调用一次,以防多个线程竞相将此键插入字典。 TaskScheduler.Default 参数确保valueFactory 将由众所周知的TaskScheduler 同步调用,并且我们不受TaskScheduler.Current(参数的默认值)的摆布,无论它可能是什么。
【解决方案5】:

我早在ConcurrentDictionary 和TPL 诞生之前就解决了这个问题。我在一家咖啡馆,没有那个原始代码,但它是这样的。

这不是一个严格的答案,但可能会激发您自己的解决方案。重要的是返回刚刚添加的值或已经存在的值以及布尔值,以便您可以分叉执行。

该设计让您可以轻松地将比赛获胜逻辑与失败逻辑分叉。

public bool TryAddValue(TKey key, TValue value, out TValue contains)
{
    // guards etc.

    while (true)
    {
        if (this.concurrentDic.TryAdd(key, value))
        {
            contains = value;
            return true;
        }
        else if (this.concurrentDic.TryGetValue(key, out var existing))
        {
            contains = existing;
            return false;
        }
        else
        {
            // Slipped down the rare path. The value was removed between the
            // above checks. I think just keep trying because we must have
            // been really unlucky.

            // Note this spinning will cause adds to execute out of
            // order since a very unlucky add on a fast moving collection
            // could in theory be bumped again and again before getting
            // lucky and getting its value added, or locating existing.

            // A tiny random sleep might work. Experiment under load.
        }
    }
}

这可以作为ConcurrentDictionary 的扩展,或者是你自己的缓存或使用锁的东西。

也许GetOrAdd(K,V) 可以与Object.ReferenceEquals() 一起使用来检查它是否被添加,而不是旋转设计。

说实话,上面的代码不是我回答的重点。强大之处在于方法签名的简单设计以及它如何提供以下功能:

static readonly ConcurrentDictionary<string, Task<Task<Thing>>> tasks = new();

//

var newTask = new Task<Task<Thing>>(() => GetThingAsync(thingId));

if (this.tasks.TryAddValue(thingId, newTask, out var task))
{
    task.Start();
}

var thingTask = await task;
var thing = await thingTask;

Task 需要如何保存Task 有点奇怪(如果您的工作是异步的),还有未使用的Tasks 的分配需要考虑。

我认为微软没有使用这种方法发布其线程安全集合或提取“并发集合”接口是一种耻辱。

我真正的实现是一个带有复杂的过期内部集合和东西的缓存。我想你可以继承 .NET Task 类并添加一个 CreatedAt 属性来帮助驱逐。

免责声明我根本没有尝试过这个,这太离谱了,但我在 2009 年在一个超高吞吐量的应用程序中使用了这种设计。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-05-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-27
    • 1970-01-01
    • 2020-11-28
    • 1970-01-01
    相关资源
    最近更新 更多