【发布时间】:2013-07-12 04:59:12
【问题描述】:
所以我的要求是让我的函数等待第一个实例 event Action<T> 来自另一个类和另一个线程,并在我的线程上处理它,允许等待被超时或 CancellationToken 中断。
我想创建一个可以重用的通用函数。我设法创建了几个(我认为)我需要的选项,但两者似乎都比我想象的要复杂。
用法
为了清楚起见,此函数的示例使用如下所示,其中serialDevice 在单独的线程上吐出事件:
var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
cancellationToken,
statusPacket => OnStatusPacketReceived(statusPacket),
a => serialDevice.StatusPacketReceived += a,
a => serialDevice.StatusPacketReceived -= a,
5000,
() => serialDevice.RequestStatusPacket());
选项 1——ManualResetEventSlim
这个选项还不错,但是Dispose 对ManualResetEventSlim 的处理比它看起来应该的要混乱。它使 ReSharper 适合我在闭包中访问修改/处置的东西,而且真的很难理解,所以我什至不确定它是否正确。也许我缺少一些可以清理它的东西,这将是我的偏好,但我没有立即看到它。这是代码。
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var eventOccurred = false;
var eventResult = default(TEvent);
var o = new object();
var slim = new ManualResetEventSlim();
Action<TEvent> setResult = result =>
{
lock (o) // ensures we get the first event only
{
if (!eventOccurred)
{
eventResult = result;
eventOccurred = true;
// ReSharper disable AccessToModifiedClosure
// ReSharper disable AccessToDisposedClosure
if (slim != null)
{
slim.Set();
}
// ReSharper restore AccessToDisposedClosure
// ReSharper restore AccessToModifiedClosure
}
}
};
subscribe(setResult);
try
{
if (initializer != null)
{
initializer();
}
slim.Wait(msTimeout, token);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(setResult);
lock(o) // ensure we don't access slim
{
slim.Dispose();
slim = null;
}
}
lock (o) // ensures our variables don't get changed in middle of things
{
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
}
选项 2 — 不带 WaitHandle 的轮询
这里的WaitForSingleEvent 函数更简洁。我可以使用ConcurrentQueue,因此甚至不需要锁。但我只是不喜欢轮询功能Sleep,而且我看不出用这种方法有什么办法。我想传入WaitHandle 而不是Func<bool> 来清理Sleep,但第二次这样做我又要清理整个Dispose。
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new ConcurrentQueue<TEvent>();
subscribe(q.Enqueue);
try
{
if (initializer != null)
{
initializer();
}
token.Sleep(msTimeout, () => !q.IsEmpty);
}
finally // ensures unsubscription in case of exception
{
unsubscribe(q.Enqueue);
}
TEvent eventResult;
var eventOccurred = q.TryDequeue(out eventResult);
if (eventOccurred)
{
handler(eventResult);
}
return eventOccurred;
}
public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
var start = DateTime.Now;
while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
{
token.ThrowIfCancellationRequested();
Thread.Sleep(1);
}
}
问题
我并不特别关心这些解决方案中的任何一个,我也不能 100% 确定它们中的任何一个都是 100% 正确的。这些解决方案中的任何一个是否比另一个更好(惯用性、效率等),还是有更简单的方法或内置功能来满足我在这里需要做的事情?
更新:迄今为止的最佳答案
以下TaskCompletionSource 解决方案的修改。无需长时间关闭、锁或任何需要的东西。看起来很简单。这里有错误吗?
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var tcs = new TaskCompletionSource<TEvent>();
Action<TEvent> handler = result => tcs.TrySetResult(result);
var task = tcs.Task;
subscribe(handler);
try
{
if (initializer != null)
{
initializer();
}
task.Wait(msTimeout, token);
}
finally
{
unsubscribe(handler);
// Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
}
if (task.Status == TaskStatus.RanToCompletion)
{
onEvent(task.Result);
return true;
}
return false;
}
更新 2:另一个很好的解决方案
原来BlockingCollection 的工作方式与ConcurrentQueue 一样,但也有接受超时和取消令牌的方法。这个解决方案的一个好处是它可以很容易地更新为WaitForNEvents:
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
var q = new BlockingCollection<TEvent>();
Action<TEvent> add = item => q.TryAdd(item);
subscribe(add);
try
{
if (initializer != null)
{
initializer();
}
TEvent eventResult;
if (q.TryTake(out eventResult, msTimeout, token))
{
handler(eventResult);
return true;
}
return false;
}
finally
{
unsubscribe(add);
q.Dispose();
}
}
【问题讨论】:
-
听起来你想要
AutoResetEvent之类的东西。您是否考虑过使用它的可能性? -
@KendallFrey 是的,这似乎让我陷入了与
ManualResetEventSlim相同的Dispose混乱,或者你有办法解决它吗? -
Resharper 抱怨的原因是因为传入了订阅和取消订阅操作,它无法分析流程。如果您通过事件(通过反射)或以某种方式使流更可验证。无论如何,我个人并不高度重视 Resharper 警告。
-
这些都没有任何意义。如果您无法控制事件源,则无法拦截在线程池线程上触发的 DataReceived 之类的事件。让它只触发一次也没有任何意义,它只意味着调用 Read() 不会阻塞。它不保证您会获得想要阅读的所有内容。
-
@HansPassant 你能澄清一下吗?当我测试它们时,这两个选项都没有错误(到目前为止)。我什至没有
Read问题中的函数,所以我不确定你的意思。serialDevice.RequestStatusPacket()将命令发送到我的设备以响应状态数据包,我需要等待响应,该响应来自另一个线程,超时或取消,并在我自己的线程上处理事件。不知道为什么这没有意义。
标签: c# multithreading events asynchronous dispose