如果您不需要Wait() 调用来返回Task,但对能够await Wait() 感到满意,那么您可以实现自定义awaiter/awaitable。
请参阅this link 了解编译器使用的等待模式的概述。
在实现自定义等待对象时,您将只处理委托,而实际的“等待”由您决定。当您想“等待”某个条件时,通常可以保留待处理的延续列表,并且只要条件满足,您就可以调用这些延续。您只需要处理来自await 可以从任意线程调用这一事实的同步。如果你知道你只会从一个线程(比如 UI 线程)await,那么你根本不需要任何同步!
我会尝试给你一个无锁的实现,但不能保证它是正确的。 如果您不明白为什么所有竞争条件都是安全的,则不应使用它并使用锁定语句或其他您知道如何调试的技术来实现 async/await 协议。
public sealed class AsyncMonitor
{
private PulseAwaitable _currentWaiter;
public AsyncMonitor()
{
_currentWaiter = new PulseAwaitable();
}
public void Pulse()
{
// Optimize for the case when calling Pulse() when nobody is waiting.
//
// This has an inherent race condition when calling Pulse() and Wait()
// at the same time. The question this was written for did not specify
// how to resolve this, so it is a valid answer to tolerate either
// result and just allow the race condition.
//
if (_currentWaiter.HasWaitingContinuations)
Interlocked.Exchange(ref _currentWaiter, new PulseAwaitable()).Complete();
}
public PulseAwaitable Wait()
{
return _currentWaiter;
}
}
// This class maintains a list of waiting continuations to be executed when
// the owning AsyncMonitor is pulsed.
public sealed class PulseAwaitable : INotifyCompletion
{
// List of pending 'await' delegates.
private Action _pendingContinuations;
// Flag whether we have been pulsed. This is the primary variable
// around which we build the lock free synchronization.
private int _pulsed;
// AsyncMonitor creates instances as required.
internal PulseAwaitable()
{
}
// This check has a race condition which is tolerated.
// It is used to optimize for cases when the PulseAwaitable has no waiters.
internal bool HasWaitingContinuations
{
get { return Volatile.Read(ref _pendingContinuations) != null; }
}
// Called by the AsyncMonitor when it is pulsed.
internal void Complete()
{
// Set pulsed flag first because that is the variable around which
// we build the lock free protocol. Everything else this method does
// is free to have race conditions.
Interlocked.Exchange(ref _pulsed, 1);
// Execute pending continuations. This is free to race with calls
// of OnCompleted seeing the pulsed flag first.
Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
}
#region Awaitable
// There is no need to separate the awaiter from the awaitable
// so we use one class to implement both parts of the protocol.
public PulseAwaitable GetAwaiter()
{
return this;
}
#endregion
#region Awaiter
public bool IsCompleted
{
// The return value of this property does not need to be up to date so we could omit the 'Volatile.Read' if we wanted to.
// What is not allowed is returning "true" even if we are not completed, but this cannot happen since we never transist back to incompleted.
get { return Volatile.Read(ref _pulsed) == 1; }
}
public void OnCompleted(Action continuation)
{
// Protected against manual invocations. The compiler-generated code never passes null so you can remove this check in release builds if you want to.
if (continuation == null)
throw new ArgumentNullException(nameof(continuation));
// Standard pattern of maintaining a lock free immutable variable: read-modify-write cycle.
// See for example here: https://blogs.msdn.microsoft.com/oldnewthing/20140516-00/?p=973
// Again the 'Volatile.Read' is not really needed since outdated values will be detected at the first iteration.
var oldContinuations = Volatile.Read(ref _pendingContinuations);
for (;;)
{
var newContinuations = (oldContinuations + continuation);
var actualContinuations = Interlocked.CompareExchange(ref _pendingContinuations, newContinuations, oldContinuations);
if (actualContinuations == oldContinuations)
break;
oldContinuations = actualContinuations;
}
// Now comes the interesting part where the actual lock free synchronization happens.
// If we are completed then somebody needs to clean up remaining continuations.
// This happens last so the first part of the method can race with pulsing us.
if (IsCompleted)
Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke();
}
public void GetResult()
{
// This is just to check against manual calls. The compiler will never call this when IsCompleted is false.
// (Assuming your OnCompleted implementation is bug-free and you don't execute continuations before IsCompleted becomes true.)
if (!IsCompleted)
throw new NotSupportedException("Synchronous waits are not supported. Use 'await' or OnCompleted to wait asynchronously");
}
#endregion
}
您通常不会打扰延续运行的线程,因为如果它们是异步方法,编译器已经插入代码(在延续中)以切换回正确的线程,无需在每个等待的实现中手动执行.
[编辑]
作为一个锁定实现外观的起点,我将提供一个使用锁定语句的实现。应该很容易用自旋锁或其他一些锁定技术替换它。通过使用结构作为可等待对象,它甚至具有除了初始对象之外不进行额外分配的优点。 (调用端的编译器魔法中的 async/await 框架中当然有分配,但你无法摆脱这些。)
请注意,迭代计数器将仅针对每个 Wait+Pulse 对递增,并最终溢出为负数,但这没关系。我们只需要连接从被调用的持续蜂直到它可以调用 GetResult 的时间。 40 亿个 Wait+Pulse 对应该有足够的时间让任何未决的继续调用其 GetResult 方法。如果您不想冒这种风险,您可以使用 long 或 Guid 来获得更独特的迭代计数器,但恕我直言,int 几乎适用于所有场景。
public sealed class AsyncMonitor
{
public struct Awaitable : INotifyCompletion
{
// We use a struct to avoid allocations. Note that this means the compiler will copy
// the struct around in the calling code when doing 'await', so for your own debugging
// sanity make all variables readonly.
private readonly AsyncMonitor _monitor;
private readonly int _iteration;
public Awaitable(AsyncMonitor monitor)
{
lock (monitor)
{
_monitor = monitor;
_iteration = monitor._iteration;
}
}
public Awaitable GetAwaiter()
{
return this;
}
public bool IsCompleted
{
get
{
// We use the iteration counter as an indicator when we should be complete.
lock (_monitor)
{
return _monitor._iteration != _iteration;
}
}
}
public void OnCompleted(Action continuation)
{
// The compiler never passes null, but someone may call it manually.
if (continuation == null)
throw new ArgumentNullException(nameof(continuation));
lock (_monitor)
{
// Not calling IsCompleted since we already have a lock.
if (_monitor._iteration == _iteration)
{
_monitor._waiting += continuation;
// null the continuation to indicate the following code
// that we completed and don't want it executed.
continuation = null;
}
}
// If we were already completed then we didn't null the continuation.
// (We should invoke the continuation outside of the lock because it
// may want to Wait/Pulse again and we want to avoid reentrancy issues.)
continuation?.Invoke();
}
public void GetResult()
{
lock (_monitor)
{
// Not calling IsCompleted since we already have a lock.
if (_monitor._iteration == _iteration)
throw new NotSupportedException("Synchronous wait is not supported. Use await or OnCompleted.");
}
}
}
private Action _waiting;
private int _iteration;
public AsyncMonitor()
{
}
public void Pulse(bool executeAsync)
{
Action execute = null;
lock (this)
{
// If nobody is waiting we don't need to increment the iteration counter.
if (_waiting != null)
{
_iteration++;
execute = _waiting;
_waiting = null;
}
}
// Important: execute the callbacks outside the lock because they might Pulse or Wait again.
if (execute != null)
{
// If the caller doesn't want inlined execution (maybe he holds a lock)
// then execute it on the thread pool.
if (executeAsync)
Task.Run(execute);
else
execute();
}
}
public Awaitable Wait()
{
return new Awaitable(this);
}
}