【问题标题】:How to implement BlockingCollection to fix this Producer/Consumer issue?如何实施 BlockingCollection 来解决这个生产者/消费者问题?
【发布时间】:2011-10-09 07:36:03
【问题描述】:

我目前有一个应用程序从套接字接收数据包,处理它们并将它们添加到 ConcurrentQueue。然后我有一个单独的线程来处理这些项目。

我遇到的问题是生产者/消费者问题,即使没有任何物品,消费者也试图拿走物品,从而导致大量 cpu 使用。

ProcessPackets 在自己的线程上运行:

    private ConcurrentQueue<PrimaryPacket> Waiting = new ConcurrentQueue<PrimaryPacket>();

    private void ProcessPackets()
    {
        PrimaryPacket e;

        while (true)
        {
            if (Waiting.TryDequeue(out e))
            {
                Packets.TryAdd(((ulong)e.IPAddress << 32 | e.RequestID), e);
            }
        }
    }

    public void AddPacket(PrimaryPacket e)
    {
        Waiting.Enqueue(e);
    }

实施 BlockingCollection(T) 来处理这个问题的最佳方式是什么?还是另一种解决方案?

同样值得注意的是,每秒大约有 30,000 个项目被添加到队列中。

【问题讨论】:

  • 我试过这个方法,但是跟不上每秒添加到队列中的 30,000 个项目。
  • 我很难相信这一点。 AutoResetEvent 不是最快的方法,但 30 000 确实不是那么多。您可以尝试使用 ManualResetEventSlim,它在初始状态中使用自旋计数,但实际上这不应该是必要的,除非您在 ProcessPackets 中有代码需要很长时间(但这超出了这个问题的范围)
  • 我刚刚使用BlockingCollection&lt;int&gt; 进行了测试,每秒吞吐量约为 2,000,000。
  • 贴出来的代码肯定跟不上,烧掉100%核心。您必须让线程调度程序有机会在准备好处理数据时唤醒您的线程。就像 BlockingCollection 一样。循环会让你失去线程量。

标签: c# multithreading


【解决方案1】:

你不必实现BlockingCollection&lt;T&gt;,你可以使用它。正如文档所说,它只是IProducerConsumerCollection&lt;T&gt; 的包装器,例如ConcurrentQueue&lt;T&gt;(这是默认的)。

private BlockingCollection<PrimaryPacket> Waiting =
    new BlockingCollection<PrimaryPacket>();

private void ProcessPackets()
{
    while (true)
    {
        PrimaryPacket e = Waiting.Take();
        Packets.TryAdd(((ulong)e.IPAddress << 32 | e.RequestID), e);
    }
}

public void AddPacket(PrimaryPacket e)
{
    Waiting.Add(e);
}

Take() 会在队列为空时阻塞,因此不会不必要地消耗 CPU。并且您可能应该考虑处理完成后要做什么。

【讨论】:

  • 谢谢。我实际上在睡觉前想出了这个方法。不敢相信就这么简单。
猜你喜欢
  • 1970-01-01
  • 2015-05-07
  • 1970-01-01
  • 2011-03-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多