【问题标题】:Async version of Monitor.Pulse/WaitMonitor.Pulse/Wait 的异步版本
【发布时间】:2016-04-19 23:06:13
【问题描述】:

我正在尝试优化类似于 Monitor.Wait 和 Monitor.Pulse 方法的异步版本(在基本功能上)。这个想法是通过异步方法使用它。

要求: 1)我有一个任务正在运行,它负责等到有人给我的显示器发脉冲。 2) 该任务可能会计算复杂的(即:耗时的)操作。同时,可以多次调用pulse方法而不做任何事情(因为主要任务已经在做一些处理)。 3) 一旦主任务完成,它就会再次开始等待,直到另一个 Pulse 进来。

最坏的情况是等待>脉冲>等待>脉冲>等待...,但通常我每次等待都有十分之一/数百个脉冲。

所以,我有以下课程(工作,但我认为可以根据我的要求对其进行一些优化)

internal sealed class Awaiter
{
    private readonly ConcurrentQueue<TaskCompletionSource<byte>> _waiting = new ConcurrentQueue<TaskCompletionSource<byte>>();

    public void Pulse()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryDequeue(out tcs))
        {
            tcs.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        TaskCompletionSource<byte> tcs;
        if (_waiting.TryPeek(out tcs))
        {
            return tcs.Task;
        }

        tcs = new TaskCompletionSource<byte>();
        _waiting.Enqueue(tcs);
        return tcs.Task;
    }
}

上述课程的问题是我仅用于同步的包袱。由于我将只等待一个线程,因此实际上不需要 ConcurrentQueue,因为我总是只有一个项目。

所以,我稍微简化了一下,写了以下内容:

internal sealed class Awaiter2
{
    private readonly object _mutex = new object();
    private TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w == null)
            {
                return;
            }

            _waiting = null;
            w.TrySetResult(1);
        }
    }

    public Task Wait()
    {
        var w = _waiting;
        if (w != null)
        {
            return w.Task;
        }

        lock (_mutex)
        {
            w = _waiting;
            if (w != null)
            {
                return w.Task;
            }

            w = _waiting = new TaskCompletionSource<byte>();
            return w.Task;
        }
    }
}

这个新版本也可以正常工作,但我仍然认为它可以通过移除锁进行更多优化。

我正在寻找有关如何优化第二个版本的建议。有什么想法吗?

【问题讨论】:

  • 您是否对您的代码进行了概要分析,它实际花费了多少时间等待锁定?不要猜测性能问题出在哪里,在进行诸如编写无锁代码之类的微优化时,您确实需要进行分析以查看是否真的值得这样做。
  • 这不是锁本身的问题,但是这段代码将被打包在一个库中,其他开发人员将在野外使用,我正在尝试为这个用例创建最优化的版本。我也在尝试最小化对象分配等。
  • @CheloXL:同步原语很难正确。我有一个库,includes an AsyncMonitor 完成了详尽的单元测试,如果这对你有帮助的话。

标签: c# multithreading asynchronous


【解决方案1】:

如果您不需要Wait() 调用来返回Task,但对能够await Wait() 感到满意,那么您可以实现自定义awaiter/awaitable。

请参阅this link 了解编译器使用的等待模式的概述。

在实现自定义等待对象时,您将只处理委托,而实际的“等待”由您决定。当您想“等待”某个条件时,通常可以保留待处理的延续列表,并且只要条件满足,您就可以调用这些延续。您只需要处理来自await 可以从任意线程调用这一事实的同步。如果你知道你只会从一个线程(比如 UI 线程)await,那么你根本不需要任何同步!

我会尝试给你一个无锁的实现,但不能保证它是正确的。 如果您不明白为什么所有竞争条件都是安全的,则不应使用它并使用锁定语句或其他您知道如何调试的技术来实现 async/await 协议。

public sealed class AsyncMonitor
{
    private PulseAwaitable _currentWaiter;

    public AsyncMonitor()
    {
        _currentWaiter = new PulseAwaitable();
    }

    public void Pulse()
    {
        // Optimize for the case when calling Pulse() when nobody is waiting.
        //
        // This has an inherent race condition when calling Pulse() and Wait()
        // at the same time. The question this was written for did not specify
        // how to resolve this, so it is a valid answer to tolerate either
        // result and just allow the race condition.
        //
        if (_currentWaiter.HasWaitingContinuations)
            Interlocked.Exchange(ref _currentWaiter, new PulseAwaitable()).Complete();
    }

    public PulseAwaitable Wait()
    {
        return _currentWaiter;
    }
}

// This class maintains a list of waiting continuations to be executed when
// the owning AsyncMonitor is pulsed.
public sealed class PulseAwaitable : INotifyCompletion
{
    // List of pending 'await' delegates.
    private Action _pendingContinuations;

    // Flag whether we have been pulsed. This is the primary variable
    // around which we build the lock free synchronization.
    private int _pulsed;

    // AsyncMonitor creates instances as required.
    internal PulseAwaitable()
    {
    }

    // This check has a race condition which is tolerated.
    // It is used to optimize for cases when the PulseAwaitable has no waiters.
    internal bool HasWaitingContinuations
    {
        get { return Volatile.Read(ref _pendingContinuations) != null; }
    }

    // Called by the AsyncMonitor when it is pulsed.
    internal void Complete()
    {
        // Set pulsed flag first because that is the variable around which
        // we build the lock free protocol. Everything else this method does
        // is free to have race conditions.
        Interlocked.Exchange(ref _pulsed, 1);

        // Execute pending continuations. This is free to race with calls
        // of OnCompleted seeing the pulsed flag first.
        Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
    }

    #region Awaitable

    // There is no need to separate the awaiter from the awaitable
    // so we use one class to implement both parts of the protocol.
    public PulseAwaitable GetAwaiter()
    {
        return this;
    }

    #endregion

    #region Awaiter

    public bool IsCompleted
    {
        // The return value of this property does not need to be up to date so we could omit the 'Volatile.Read' if we wanted to.
        // What is not allowed is returning "true" even if we are not completed, but this cannot happen since we never transist back to incompleted.
        get { return Volatile.Read(ref _pulsed) == 1; }
    }

    public void OnCompleted(Action continuation)
    {
        // Protected against manual invocations. The compiler-generated code never passes null so you can remove this check in release builds if you want to.
        if (continuation == null)
            throw new ArgumentNullException(nameof(continuation));

        // Standard pattern of maintaining a lock free immutable variable: read-modify-write cycle.
        // See for example here: https://blogs.msdn.microsoft.com/oldnewthing/20140516-00/?p=973
        // Again the 'Volatile.Read' is not really needed since outdated values will be detected at the first iteration.
        var oldContinuations = Volatile.Read(ref _pendingContinuations);
        for (;;)
        {
            var newContinuations = (oldContinuations + continuation);
            var actualContinuations = Interlocked.CompareExchange(ref _pendingContinuations, newContinuations, oldContinuations);
            if (actualContinuations == oldContinuations)
                break;

            oldContinuations = actualContinuations;
        }

        // Now comes the interesting part where the actual lock free synchronization happens.
        // If we are completed then somebody needs to clean up remaining continuations.
        // This happens last so the first part of the method can race with pulsing us.
        if (IsCompleted)
            Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
    }

    public void GetResult()
    {
        // This is just to check against manual calls. The compiler will never call this when IsCompleted is false.
        // (Assuming your OnCompleted implementation is bug-free and you don't execute continuations before IsCompleted becomes true.)
        if (!IsCompleted)
            throw new NotSupportedException("Synchronous waits are not supported. Use 'await' or OnCompleted to wait asynchronously");
    }

    #endregion
}

您通常不会打扰延续运行的线程,因为如果它们是异步方法,编译器已经插入代码(在延续中)以切换回正确的线程,无需在每个等待的实现中手动执行.

[编辑]

作为一个锁定实现外观的起点,我将提供一个使用锁定语句的实现。应该很容易用自旋锁或其他一些锁定技术替换它。通过使用结构作为可等待对象,它甚至具有除了初始对象之外不进行额外分配的优点。 (调用端的编译器魔法中的 async/await 框架中当然有分配,但你无法摆脱这些。)

请注意,迭代计数器将仅针对每个 Wait+Pulse 对递增,并最终溢出为负数,但这没关系。我们只需要连接从被调用的持续蜂直到它可以调用 GetResult 的时间。 40 亿个 Wait+Pulse 对应该有足够的时间让任何未决的继续调用其 GetResult 方法。如果您不想冒这种风险,您可以使用 long 或 Guid 来获得更独特的迭代计数器,但恕我直言,int 几乎适用于所有场景。

public sealed class AsyncMonitor
{
    public struct Awaitable : INotifyCompletion
    {
        // We use a struct to avoid allocations. Note that this means the compiler will copy
        // the struct around in the calling code when doing 'await', so for your own debugging
        // sanity make all variables readonly.
        private readonly AsyncMonitor _monitor;
        private readonly int _iteration;

        public Awaitable(AsyncMonitor monitor)
        {
            lock (monitor)
            {
                _monitor = monitor;
                _iteration = monitor._iteration;
            }
        }

        public Awaitable GetAwaiter()
        {
            return this;
        }

        public bool IsCompleted
        {
            get
            {
                // We use the iteration counter as an indicator when we should be complete.
                lock (_monitor)
                {
                    return _monitor._iteration != _iteration;
                }
            }
        }

        public void OnCompleted(Action continuation)
        {
            // The compiler never passes null, but someone may call it manually.
            if (continuation == null)
                throw new ArgumentNullException(nameof(continuation));

            lock (_monitor)
            {
                // Not calling IsCompleted since we already have a lock.
                if (_monitor._iteration == _iteration)
                {
                    _monitor._waiting += continuation;

                    // null the continuation to indicate the following code
                    // that we completed and don't want it executed.
                    continuation = null;
                }
            }

            // If we were already completed then we didn't null the continuation.
            // (We should invoke the continuation outside of the lock because it
            // may want to Wait/Pulse again and we want to avoid reentrancy issues.)
            continuation?.Invoke();
        }

        public void GetResult()
        {
            lock (_monitor)
            {
                // Not calling IsCompleted since we already have a lock.
                if (_monitor._iteration == _iteration)
                    throw new NotSupportedException("Synchronous wait is not supported. Use await or OnCompleted.");
            }
        }
    }

    private Action _waiting;
    private int _iteration;

    public AsyncMonitor()
    {
    }

    public void Pulse(bool executeAsync)
    {
        Action execute = null;

        lock (this)
        {
            // If nobody is waiting we don't need to increment the iteration counter.
            if (_waiting != null)
            {
                _iteration++;
                execute = _waiting;
                _waiting = null;
            }
        }

        // Important: execute the callbacks outside the lock because they might Pulse or Wait again.
        if (execute != null)
        {
            // If the caller doesn't want inlined execution (maybe he holds a lock)
            // then execute it on the thread pool.
            if (executeAsync)
                Task.Run(execute);
            else
                execute();
        }
    }

    public Awaitable Wait()
    {
        return new Awaitable(this);
    }
}

【讨论】:

  • 很好!,是的,我实际上不需要任务,因为我总是等待等待。也许我可以在脉冲中使用自旋锁,例如 while(Interlocked.CompareExchange(ref _waiting, null, w) != w) { } 和等待中的 Interlocked.Exchange?
  • @CheloXL - 是的,您可以使用自旋锁,但如果您正在执行显式锁定,您可以通过使用结构优化每个等待/脉冲对的分配(在无锁时不可能)。我添加了一个使用锁语句的示例,它很容易切换到自旋锁。
  • @CheloXL 另外,作为一种语法噱头,如果您希望能够编写 await monitor 而不是 await monitor.Wait() 您可以将 Wait 方法重命名为 GetAwaiter(并从 awaitable 中删除 GetAwaiter) .
  • 在 OnCompleted 中,您在锁内使 continuation 无效,然后执行 continuation()... 这将导致 NRE。这是复制/粘贴错误吗?而不是将 continuation 设置为 null .. 你能简单地做一个 return 吗?
  • @CheloXL 是的,我确实在早期版本中使用了 return 并打算将调用更改为 continuation?.Invoke() 因为在阅读代码时返回更容易被忽略 - 所以是的,这是一个错误,而修改代码,将修复
【解决方案2】:

这是我在项目中使用的简单异步实现:

internal sealed class Pulsar
{
    private static TaskCompletionSource<bool> Init() => new TaskCompletionSource<bool>();

    private TaskCompletionSource<bool> _tcs = Init();

    public void Pulse()
    {
        Interlocked.Exchange(ref _tcs, Init()).SetResult(true);
    }

    public Task AwaitPulse(CancellationToken token)
    {
        return token.CanBeCanceled ? _tcs.Task.WithCancellation(token) : _tcs.Task;
    }
}

TaskCreationOptions.RunContinuationsAsynchronously 添加到 TCS 以实现异步继续。

如果您不需要取消,当然可以省略WithCancellation

【讨论】:

  • 请注意,等待AwaitPulse的任务将作为CancellationToken注册的一部分在同一线程上同步执行,因此运行Pulse()的线程将阻塞。
【解决方案3】:

因为你只有一个任务等待你的功能可以简化为

internal sealed class Awaiter3
{
    private volatile TaskCompletionSource<byte> _waiting;

    public void Pulse()
    {
        var w = _waiting;
        if (w == null)
        {
            return;
        }
        _waiting = null;
#if NET_46_OR_GREATER
        w.TrySetResult(1);
#else
        Task.Run(() => w.TrySetResult(1));
#endif

    }

    //This method is not thread safe and can only be called by one thread at a time.
    // To make it thread safe put a lock around the null check and the assignment,
    // you do not need to have a lock on Pulse, "volatile" takes care of that side.
    public Task Wait()
    {
        if(_waiting != null)
            throw new InvalidOperationException("Only one waiter is allowed to exist at a time!");

#if NET_46_OR_GREATER
        _waiting = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously);
#else
        _waiting = new TaskCompletionSource<byte>();
#endif
        return _waiting.Task;
    }
}

我确实改变了一种行为。如果您使用的是 .NET 4.6 或更高版本,请使用 #if NET_46_OR_GREATER 块中的代码,如果在使用 else 块。当您调用TrySetResult 时,您可以让延续同步运行,这可能导致Pulse() 需要很长时间才能完成。通过在 .NET 4.6 中使用 TaskCreationOptions.RunContinuationsAsynchronously 或将 TrySetResult 包装在 4.6 之前的 Task.Run 中,将确保 Puse() 不会被任务的继续阻塞。

请参阅 SO 问题 Detect target framework version at compile time,了解如何在您的代码中创建 NET_46_OR_GREATER 定义。

【讨论】:

  • 我假设 .NET 6.0 是指 .NET 4.6?
  • 是的,抱歉,错字,错误修复
  • 关于延续:这意味着“等待”的任务有可能在触发 setresult 的同一线程上运行? (所以,Pulse方法直到Task等待再次进入等待状态才会退出??)...
  • @CheloXL 是的,请参阅stackoverflow.com/questions/22579206/… 了解更多深入信息。
【解决方案4】:

一个简单的方法是使用SemaphoreSlim,它使用监视器。

public class AsyncMonitor
{
    private readonly SemaphoreSlim signal = new SemaphoreSlim(0, 1);

    public void Pulse()
    {
        try
        {
            signal.Release();
        }
        catch (SemaphoreFullException) { }
    }

    public async Task WaitAsync(CancellationToken cancellationToken)
    {
        await signal.WaitAsync(cancellationToken).ConfigureAwait(false);
    }
}

【讨论】:

    猜你喜欢
    • 2012-08-20
    • 2014-06-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-11
    • 1970-01-01
    相关资源
    最近更新 更多