【问题标题】:C# .NET - Buffer messages w/TimerC# .NET - 带定时器的缓冲区消息
【发布时间】:2017-03-19 08:21:49
【问题描述】:

我需要实现一个基于时间的消息缓冲系统。

我需要做的是存储我的类的实例,然后在我达到 100 个实例或 1 分钟过去时将它们发送出去。

基本上:

List<Message> messages;

public void GotNewMessage(Message msg)
{
    messages.add(msg);

    if (messages.count() == 100 || timer.elapsed(1 minute))
    {
        SendMessages(messages);
        messages.clear()
    }
}

我似乎无法弄清楚如何在不过度使用锁的情况下实现这一点,这会大大减慢进程。有谁知道实现这样一个系统的好方法?提前致谢。

【问题讨论】:

  • 必须自己实现吗?有一个完整的图书馆可以做这种事情。在您的伪实现中,如果两分钟内没有出现某些内容,您将失去一分钟的行为。您必须将计时器保存在方法外部,并从事件处理程序中调用它。

标签: c# .net linq timer buffering


【解决方案1】:

有一个很棒的库可以满足这些需求(结合时间和序列),它就是 Reactive Extensions。见https://github.com/Reactive-Extensions/Rx.NET

然后你可以写类似的东西

void Main()
{
    messages
        .Buffer(TimeSpan.FromMinutes(1), 100) // Buffer until 100 items or 1 minute has elapsed, whatever comes first.
        .Subscribe(msgs => SendMessages(msgs));     
}

Subject<Message> messages = new Subject<Message>();

public void GotNewMessage(Message msg)
{
    messages.OnNext(msg);
}

注意:这还没有准备好生产,但它显示了如何做到这一点的基本知识。根据你从哪里接收消息,有更好的方法来创建一个 Observable 来订阅。

更多参考:

如果您的消息是使用事件接收的,您可以将事件链接到 RX 流,请参阅 https://msdn.microsoft.com/en-us/library/hh242978(v=vs.103).aspxhttps://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.fromeventpattern(v=vs.103).aspx

【讨论】:

  • 很好的答案,不知道这个库的存在。很有用。谢谢!
【解决方案2】:

首先,您应该考虑使用 ConcurrentQueue 插入 ListConcurrentQueue 是线程安全的,不需要额外的锁。有了这个,您已经为自己节省了消息队列的锁。 互锁在不可用时提供原子性。

根据C# language specification,独立读/写是原子的(但仅适用于某些数据类型,而 long 并不总是原子的 - 这就是为什么我将 DateTime.Now.Ticks 转移到没有 int32丢失任何会影响经过时间的位)和读取-修改-写入(例如 ++i)绝不是原子的。

移位(例如)是独立的,不需要任何额外的锁定。

private ConcurrentQueue<Message> Queue = new ConcurrentQueue<Message>();
private int QueueSize = 0;
private int LastSend = (int)(DateTime.Now.Ticks >> 23);
private int LastMessage = (int)(DateTime.Now.Ticks >> 23);

public void GotNewMessage(Message Message)
{
    Queue.Enqueue(Message);

    Interlocked.Increment(ref QueueSize);
    Interlocked.Exchange(ref LastMessage, (int)(DateTime.Now.Ticks >> 23));

    if (Interlocked.CompareExchange(ref QueueSize, 0, 100) >= 100 || 
        LastMessage - LastSend >= 60)
    {
        Message Dummy;
        while (!Queue.IsEmpty)
            if (Queue.TryDequeue(out Dummy))
                SendMessage(Dummy);

        Interlocked.Exchange(ref LastSend, (int)(DateTime.Now.Ticks >> 23));
    }
}

public void SendMessage(Message Message)
{
    // ...
}

编辑:可能会发送超过 100 条消息。如果您希望发送严格的 100 条消息,您可以在循环中实现另一个原子增量。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-06-05
    • 1970-01-01
    • 1970-01-01
    • 2013-01-09
    • 2011-08-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多