【发布时间】:2014-08-11 20:25:46
【问题描述】:
努力寻找一段代码,它可以满足我的需求并且我很满意。阅读this 和this 帮助很大。
我有一个场景,当有新数据可用时,我需要由单个生产者通知单个消费者,但也希望无论是否有新数据可用,都可以定期通知消费者。 如果通知消费者的时间超过了重复周期,那很好,但不应减少通知的频率。
当消费者已经收到通知并正在工作时,可能会发生多个“新数据”通知。 (所以SemaphoreSlim 不太合适)。
因此,比生产者通知速度慢的消费者不会将后续通知排队,他们只会“重新发出”相同的“数据可用”标志而不会受到影响。
我还希望消费者异步等待通知(不阻塞线程)。
我已经将下面的类缝合在一起,它环绕TaskCompletionSource,并且还使用了一个内部定时器。
public class PeriodicalNotifier : IDisposable
{
// Need some dummy type since TaskCompletionSource has only the generic version
internal struct VoidTypeStruct { }
// Always reuse this allocation
private static VoidTypeStruct dummyStruct;
private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
private Timer reSendTimer;
public PeriodicalNotifier(int autoNotifyIntervalMs)
{
internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
}
public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
{
using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
{
await internalCompletionSource.Task;
// Recreate - to be able to set again upon the next wait
internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
}
}
public void Notify()
{
internalCompletionSource.TrySetResult(dummyStruct);
}
public void Dispose()
{
reSendTimer.Dispose();
internalCompletionSource.TrySetCanceled();
}
}
这个类的用户可以做这样的事情:
private PeriodicalNotifier notifier = new PeriodicalNotifier(100);
// ... In some task - which should be non-blocking
while (some condition)
{
await notifier.WaitForNotifictionAsync(_tokenSource.Token);
// Do some work...
}
// ... In some thread, producer added new data
notifier.Notify();
效率对我来说很重要,场景是高频数据流,所以我想到了:
- 等待的非阻塞性质。
- 我认为 Timer 比重新创建 Task.Delay 并在它不是要通知的情况下取消它更有效。
- 关注
TaskCompletionSource的娱乐
我的问题是:
- 我的代码能否正确解决问题?有什么隐藏的陷阱吗?
- 我是否缺少此用例的一些琐碎解决方案/现有块?
更新:
我得出的结论是,除了重新实现更精简的任务完成结构(如在here 和here 中)之外,我没有更多的优化可做。希望对遇到类似情况的人有所帮助。
【问题讨论】:
-
在我看来,Rx 应该能够很好地处理这个问题。有什么理由不使用 Rx?
-
我隐约同意,在其他一些项目中一直在摆弄 Rx,但这是一个遗留项目,为这个小作品引入 Rx 似乎是一种过度杀戮。顺便说一句,您是否考虑过合并两个流,一个 Observable.Interval 和一个“可用数据”事件?
-
是的,我就是这么想的。我发现作为一般规则,只要涉及时间(例如,您的定期通知),这就是 Rx 真正闪耀的地方。
-
谢谢,我想我会重新考虑我的方法。
-
您是否考虑过将 TPL 数据流作为一种选择?
标签: c# .net task-parallel-library async-await producer-consumer