【问题标题】:Non blocking and reoccurring producer/consumer notifier implementation非阻塞和重复的生产者/消费者通知器实现
【发布时间】:2014-08-11 20:25:46
【问题描述】:

努力寻找一段代码,它可以满足我的需求并且我很满意。阅读thisthis 帮助很大。

我有一个场景,当有新数据可用时,我需要由单个生产者通知单个消费者,但也希望无论是否有新数据可用,都可以定期通知消费者。 如果通知消费者的时间超过了重复周期,那很好,但不应减少通知的频率。

当消费者已经收到通知并正在工作时,可能会发生多个“新数据”通知。 (所以SemaphoreSlim 不太合适)。 因此,比生产者通知速度慢的消费者不会将后续通知排队,他们只会“重新发出”相同的“数据可用”标志而不会受到影响。

我还希望消费者异步等待通知(不阻塞线程)。

我已经将下面的类缝合在一起,它环绕TaskCompletionSource,并且还使用了一个内部定时器。

public class PeriodicalNotifier : IDisposable
{
    // Need some dummy type since TaskCompletionSource has only the generic version
    internal struct VoidTypeStruct { }
    // Always reuse this allocation
    private static VoidTypeStruct dummyStruct;

    private TaskCompletionSource<VoidTypeStruct> internalCompletionSource;
    private Timer reSendTimer;

    public PeriodicalNotifier(int autoNotifyIntervalMs)
    {
        internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        reSendTimer = new Timer(_ => Notify(), null, 0, autoNotifyIntervalMs);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        using (cancellationToken.Register(() => internalCompletionSource.TrySetCanceled()))
        {
            await internalCompletionSource.Task;
            // Recreate - to be able to set again upon the next wait
            internalCompletionSource = new TaskCompletionSource<VoidTypeStruct>();
        }
    }

    public void Notify()
    {
        internalCompletionSource.TrySetResult(dummyStruct);
    }

    public void Dispose()
    {
        reSendTimer.Dispose();
        internalCompletionSource.TrySetCanceled();
    }
}

这个类的用户可以做这样的事情:

private PeriodicalNotifier notifier = new PeriodicalNotifier(100);

// ... In some task - which should be non-blocking
while (some condition)
{
    await notifier.WaitForNotifictionAsync(_tokenSource.Token);
    // Do some work...
}

// ... In some thread, producer added new data
notifier.Notify();

效率对我来说很重要,场景是高频数据流,所以我想到了:

  • 等待的非阻塞性质。
  • 我认为 Timer 比重新创建 Task.Delay 并在它不是要通知的情况下取消它更有效。
  • 关注TaskCompletionSource 的娱乐

我的问题是:

  1. 我的代码能否正确解决问题?有什么隐藏的陷阱吗?
  2. 我是否缺少此用例的一些琐碎解决方案/现有块?

更新:

我得出的结论是,除了重新实现更精简的任务完成结构(如在herehere 中)之外,我没有更多的优化可做。希望对遇到类似情况的人有所帮助。

【问题讨论】:

  • 在我看来,Rx 应该能够很好地处理这个问题。有什么理由不使用 Rx?
  • 我隐约同意,在其他一些项目中一直在摆弄 Rx,但这是一个遗留项目,为这个小作品引入 Rx 似乎是一种过度杀戮。顺便说一句,您是否考虑过合并两个流,一个 Observable.Interval 和一个“可用数据”事件?
  • 是的,我就是这么想的。我发现作为一般规则,只要涉及时间(例如,您的定期通知),这就是 Rx 真正闪耀的地方。
  • 谢谢,我想我会重新考虑我的方法。
  • 您是否考虑过将 TPL 数据流作为一种选择?

标签: c# .net task-parallel-library async-await producer-consumer


【解决方案1】:
  1. 是的,您的实现是有意义的,但 TaskCompletionSource 娱乐应该在使用范围之外,否则“旧”取消令牌可能会取消“新”TaskCompletionSource
  2. 我认为将某种AsyncManualResetEventTimer 结合使用会更简单且不易出错。在Visual Studio SDK by Microsoft 中有一个非常好的带有异步工具的命名空间。您需要 install the SDK 然后引用 Microsoft.VisualStudio.Threading 程序集。这是使用他们的 AsyncManualResetEvent 和相同 API 的实现:

public class PeriodicalNotifier : IDisposable
{
    private readonly Timer _timer;
    private readonly AsyncManualResetEvent _asyncManualResetEvent;

    public PeriodicalNotifier(TimeSpan autoNotifyInterval)
    {
        _asyncManualResetEvent = new AsyncManualResetEvent();
        _timer = new Timer(_ => Notify(), null, TimeSpan.Zero, autoNotifyInterval);
    }

    public async Task WaitForNotifictionAsync(CancellationToken cancellationToken)
    {
        await _asyncManualResetEvent.WaitAsync().WithCancellation(cancellationToken);
        _asyncManualResetEvent.Reset();
    }

    public void Notify()
    {
        _asyncManualResetEvent.Set();
    }

    public void Dispose()
    {
        _timer.Dispose();
    }
}

您通过设置重置事件进行通知,使用WaitAsync 异步等待,使用WithCancellation 扩展方法启用取消,然后重置事件。通过设置相同的重置事件来“合并”多个通知。

【讨论】:

  • 谢谢,我正在构建它,现在比较性能和效率。
  • 顺便说一句,关于娱乐场所的位置 - 这是设计使然,否则可能会错过取消。由于预计此机制将在循环中使用,因此重新创建只是 TaskCompltionSource 行为方式的一个影响。无论哪个 TaskCompletionSource 是“负责”,取消都应适用于此块。对吗?
  • 很抱歉,我无法得到您的建议以通过我的验证测试。似乎过早到达await _buffer.OutputAvailableAsync(cancellationToken) 时,通知时不再释放等待者。
  • @YoadSnapir 你说的“太快了”是什么意思?
  • @YoadSnapir 目前取消令牌参数将取消当前的WaitForNotifictionAsync 调用,但在某些情况下可能会取消下一次调用。如果你总是使用相同的标记,那没关系,但在这种情况下,我会将它传递给构造函数,而不是每次调用。
【解决方案2】:
Subject<Result> notifier = new Subject<Result)();

notifier 
    .Select(value => Observable.Interval(TimeSpan.FromMilliSeconds(100))
                                            .Select(_ => value)).Switch()
    .Subscribe(value => DoSomething(value));

//Some other thread...
notifier.OnNext(...);

此 Rx 查询将继续发送值,每 100 毫秒,直到出现新值。然后我们每 100 毫秒通知一次该值。

如果我们接收值的速度超过每 100 毫秒一次,那么我们的输出基本上与输入相同。

【讨论】:

  • 如果我理解正确,对于每个OnNext,您将获得一个DoSomething。这不是根据我的规范,即当消费者正在“工作”通知时,只需让下一个“等待”瞬间DoSomething,因此慢速消费者不会排队通知。我对吗?编辑了我的问题以澄清规范的这一方面。
  • 任何时候都应该只运行一个 DoSomeThing。
  • 我同意,但是如果一个正在飞行中并且两个通知同时到达 - 只有另一个 DoSomething 应该在当前通知完成后触发。 (不是两个)。
  • @YoadSnapir 你甚至想要 DoSomething 的什么并发模型?默认行为是一次处理一个(我今天才从 PluralSight 学到的)。还是您不想排队?
  • 没错,在这种情况下无需排队。想想一系列价格,通常您想知道某物的最后价格,因为如果您只想购买,则 n-1 价格不再有趣。
猜你喜欢
  • 2014-02-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多