【问题标题】:How to use named mutexes and async/await in .NET Core 3.1?如何在 .NET Core 3.1 中使用命名互斥锁和异步/等待?
【发布时间】:2021-04-08 21:16:22
【问题描述】:

我正在为我的 ASP.NET Core 3.1 Web API 实现一个缓存层。

开始实施

public interface ICache
{
    T Get<T>(string key);
    void Set<T>(string key, T value);
}

public static class ICacheExtensions
{
    public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
    {
        var value = cache.Get<T>(key);

        if (EqualityComparer<T>.Default.Equals(value, default(T)))
        {
            value = factory();
            if (!EqualityComparer<T>.Default.Equals(value, default(T)))
            {
                cache.Set(key, value);
            }
        }

        return value;
    }

    public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
    {
        var value = cache.Get<T>(key);

        if (EqualityComparer<T>.Default.Equals(value, default(T)))
        {
            value = await factory().ConfigureAwait(false);
            if (!EqualityComparer<T>.Default.Equals(value, default(T)))
            {
                cache.Set(key, value);
            }
        }

        return value;
    }
}

这很好用,但我试图解决的一个已知问题是它容易受到缓存踩踏的影响。如果我的 API 正在处理许多请求,这些请求都尝试同时使用 GetOrCreate 方法之一访问相同的密钥,那么它们将各自运行工厂函数的并行实例。这意味着多余的工作和资源的浪费。

我试图做的是引入互斥锁,以确保每个缓存键只能运行一个工厂函数实例。

引入互斥体

public interface ICache
{
    T Get<T>(string key);
    void Set<T>(string key, T value);
}

public static class ICacheExtensions
{
    public static T GetOrCreate<T>(this ICache cache, string key, Func<T> factory)
    {
        using var mutex = new Mutex(false, key);
        var value = cache.Get<T>(key);

        if (EqualityComparer<T>.Default.Equals(value, default(T)))
        {
            mutex.WaitOne();

            try
            {
                var value = cache.Get<T>(key);

                if (EqualityComparer<T>.Default.Equals(value, default(T)))
                {
                    value = factory();
                    if (!EqualityComparer<T>.Default.Equals(value, default(T)))
                    {
                        cache.Set(key, value);
                    }
                }
            }
            finally
            {
                mutex.ReleaseMutex();
            }
        }

        return value;
    }

    public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
    {
        using var mutex = new Mutex(false, key);
        var value = cache.Get<T>(key);

        if (EqualityComparer<T>.Default.Equals(value, default(T)))
        {
            mutex.WaitOne();

            try
            {
                var value = cache.Get<T>(key);

                if (EqualityComparer<T>.Default.Equals(value, default(T)))
                {
                    value = await factory().ConfigureAwait(false);
                    if (!EqualityComparer<T>.Default.Equals(value, default(T)))
                    {
                        cache.Set(key, value);
                    }
                }
            }
            finally
            {
                mutex.ReleaseMutex();
            }
        }

        return value;
    }
}

这对GetOrCreate() 非常有效,但GetOrCreateAsync() 会引发异常。原来互斥锁是线程绑定的,所以如果在不同的线程上调用WaitOne()ReleaseMutex()(这往往发生在异步/等待中),互斥锁不喜欢这样并抛出异常。我发现 this other SO question 描述了一些解决方法,并决定使用自定义任务调度程序。 SingleThreadedTaskScheduler 使用仅包含一个线程的线程池调度任务。而且我打算仅从该线程与互斥锁进行交互。

SingleThreadedTaskScheduler

internal sealed class SingleThreadedTaskScheduler : TaskScheduler, IDisposable
{
    private readonly Thread _thread;
    private BlockingCollection<Task> _tasks;

    public SingleThreadedTaskScheduler()
    {
        _tasks = new BlockingCollection<Task>();
        _thread = new Thread(() =>
        {
            foreach (var t in _tasks.GetConsumingEnumerable())
            {
                TryExecuteTask(t);
            }
        });
        _thread.IsBackground = true;
        _thread.Start();
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks.ToArray();
    }

    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }

    public void Dispose()
    {
        _tasks?.CompleteAdding();
        _thread?.Join();
        _tasks?.Dispose();
        _tasks = null;
    }
}

GetOrCreateAsync 与 SingleThreadedTaskScheduler

private static readonly TaskScheduler _mutexTaskScheduler = new SingleThreadedTaskScheduler();

public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
    using var mutex = new Mutex(false, key);
    var value = cache.Get<T>(key);

    if (EqualityComparer<T>.Default.Equals(value, default(T)))
    {
        await Task.Factory
            .StartNew(() => mutex.WaitOne(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
            .ConfiureAwait(false);

        try
        {
            var value = cache.Get<T>(key);

            if (EqualityComparer<T>.Default.Equals(value, default(T)))
            {
                value = await factory().ConfigureAwait(false);
                if (!EqualityComparer<T>.Default.Equals(value, default(T)))
                {
                    cache.Set(key, value);
                }
            }
        }
        finally
        {
            await Task.Factory
                .StartNew(() => mutex.ReleaseMutex(), CancellationToken.None, TaskCreationOptions.None, _mutexTaskScheduler)
                .ConfiureAwait(false);
        }
    }

    return value;
}

通过这个实现,异常被解决了,但是GetOrCreateAsync在缓存踩踏场景中仍然多次调用工厂函数。我错过了什么吗?

我也尝试过使用SemaphoreSlim 而不是Mutex,它应该与async/await 配合得更好。这里的问题是 Linux 不支持命名信号量,所以我必须将所有信号量保存在 Dictionary&lt;string, SemaphoreSlim&gt; 中,这样管理起来太麻烦了。

【问题讨论】:

  • 你为什么要写caching layer?为了娱乐?有很多pre-existing libraries 要做缓存。
  • 让我换个说法。缓存层存在并且正在使用中,我正在尝试解决踩踏问题。
  • 这有点回避了这一点。您遇到问题的原因是您(或任何人)推出了自己的缓存层。你正在尝试重新发明轮子。为什么不利用已经解决了这些问题并经过数千名其他开发人员测试的现有缓存框架呢?交换它真的不应该做太多的工作,而且它可以让你避免维护大量复杂的代码。
  • 感谢您的意见。
  • @mason:我不知道有任何 .NET 缓存库支持异步共享创建函数调用结果。我有 started one,但它还没有准备好。

标签: c# asp.net-core .net-core async-await mutex


【解决方案1】:

linked solution 仅在使用命名互斥锁同步异步代码跨进程时有效。在 same 进程中同步代码是行不通的。互斥体允许递归获取,因此通过在同一个线程上移动所有获取,就好像互斥体根本不存在一样。

我必须将所有信号量保存在 Dictionary 中,这样管理起来太麻烦了。

如果您需要一个非递归的命名互斥体,命名为Semaphores(不适用于 Linux)或管理您自己的字典确实是唯一的方法。

我有一个AsyncCache&lt;T&gt;,我一直在努力,但还没有准备好。它试图看起来Task&lt;T&gt; 实例的缓存,但实际上是TaskCompletionSource&lt;T&gt; 实例的缓存。

【讨论】:

  • 感谢您的回复。我害怕这个。啊,好吧....所以我对似乎可行的 ConcurrentDictionary 做了一些试验。剩下的问题是如何防止字典无限增长。在某个时候,我将不得不为陈旧的密钥清除信号量。
  • 查看我的答案,了解基于斯蒂芬的回答我最终得到的结果。
【解决方案2】:

使用信号量似乎有效。感谢 Stephen Cleary 确认这是比 Mutexes 更好的路线。

public static async Task<T> GetOrCreateAsync<T>(this ICache cache, string key, Func<Task<T>> factory)
{
    using var mutex = new Mutex(false, key);
    var value = cache.Get<T>(key);

    if (EqualityComparer<T>.Default.Equals(value, default(T)))
    {
        WaitOne(key);

        try
        {
            var value = cache.Get<T>(key);

            if (EqualityComparer<T>.Default.Equals(value, default(T)))
            {
                value = await factory().ConfigureAwait(false);
                if (!EqualityComparer<T>.Default.Equals(value, default(T)))
                {
                    cache.Set(key, value);
                }
            }

            ReleaseAll(key);
        }
        catch (Exception)
        {
            ReleaseOne(key);
            throw;
        }
    }

    return value;
}

private static readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphores = new ConcurrentDictionary<string, SemaphoreSlim>();

private static void WaitOne(string key)
{
    var semaphore = _semaphores.GetOrAdd(key, k => new SemaphoreSlim(1, int.MaxValue));
    semaphore.Wait();
}

private static void ReleaseOne(string key)
{
    var semaphore = _semaphores.GetOrAdd(key, k => new SemaphoreSlim(0, int.MaxValue));
    semaphore.Release();
}

private static void ReleaseAll(string key)
{
    var semaphore = default(SemaphoreSlim);
    _semaphores.Remove(key, out semaphore);
    semaphore?.Release(int.MaxValue);
    semaphore?.Dispose();
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-09-24
    • 1970-01-01
    • 2010-12-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多