【问题标题】:Serially process ConcurrentQueue and limit to one message processor. Correct pattern?串行处理 ConcurrentQueue 并限制为一个消息处理器。正确的模式?
【发布时间】:2011-12-23 19:08:26
【问题描述】:

我正在.net 中构建一个多线程应用程序。

我有一个监听连接(抽象、串行、tcp...)的线程。

当它收到一条新消息时,它会通过 AddMessage 将它添加到。然后调用 startSpool。 startSpool 检查假脱机是否已经在运行,如果是,则返回,否则,在新线程中启动它。原因是,消息必须串行处理,先进先出。

所以,我的问题是... 我会以正确的方式解决这个问题吗? 有没有更好、更快、更便宜的模式?

如果我的代码中有错字,我很抱歉,我在复制和粘贴时遇到了问题。

    ConcurrentQueue<IMyMessage > messages = new ConcurrentQueue<IMyMessage>();

    const int maxSpoolInstances = 1;

    object lcurrentSpoolInstances;
    int currentSpoolInstances = 0;

    Thread spoolThread;

    public void AddMessage(IMyMessage message)
    {
        this.messages.Add(message);

        this.startSpool();
    }

    private void startSpool()
    {
        bool run = false;

        lock (lcurrentSpoolInstances)
        {
            if (currentSpoolInstances <= maxSpoolInstances)
            {
                this.currentSpoolInstances++;
                run = true;
            }
            else
            {
                return;
            }
        }

        if (run)
        {
            this.spoolThread = new Thread(new ThreadStart(spool));
            this.spoolThread.Start();
        }
    }

    private void spool()
    {
        Message.ITimingMessage message;

        while (this.messages.Count > 0)
        {
            // TODO: Is this below line necessary or does the TryDequeue cover this?
            message = null;

            this.messages.TryDequeue(out message);

            if (message != null)
            {
                // My long running thing that does something with this message.
            }
        }


        lock (lcurrentSpoolInstances)
        {
            this.currentSpoolInstances--;
        }
    }

【问题讨论】:

  • 队列将是先进先出的,但如果你有多个线程填充它,你怎么知道 IN's 的顺序是正确的?
  • @AustinSalonen 问题的描述表明有一个生产者线程添加项目(即:从 TCP 连接)和一个单独的消费者线程。这是一个经典的 FIFO 生产者/消费者场景……
  • 你的 startSpool 方法中有一个 ug。你永远不会增加currentSpoolInstances,所以每次调用startSpool 都会启动一个新线程。你需要在释放锁之前增加currentSpoolInstances
  • @JimMischel - 正确。我解决了这个问题。这是一个转录问题,RE:无法复制和粘贴。

标签: c# multithreading c#-4.0 concurrency


【解决方案1】:

使用BlockingCollection&lt;T&gt; 而不是ConcurrentQueue&lt;T&gt; 会更容易。

这样的事情应该可以工作:

class MessageProcessor : IDisposable
{
    BlockingCollection<IMyMessage> messages = new BlockingCollection<IMyMessage>();

    public MessageProcessor()
    {
       // Move this to constructor to prevent race condition in existing code (you could start multiple threads...
       Task.Factory.StartNew(this.spool, TaskCreationOptions.LongRunning);
    }

    public void AddMessage(IMyMessage message)
    {
        this.messages.Add(message);
    }

    private void Spool()
    {
         foreach(IMyMessage message in this.messages.GetConsumingEnumerable())
         {
               // long running thing that does something with this message.
         }
    }

    public void FinishProcessing()
    {
         // This will tell the spooling you're done adding, so it shuts down
         this.messages.CompleteAdding();
    }

    void IDisposable.Dispose()
    {
        this.FinishProcessing();
    }
}

编辑:如果您想支持多个消费者,您可以通过单独的构造函数来处理。我会将其重构为:

    public MessageProcessor(int numberOfConsumers = 1)
    {
        for (int i=0;i<numberOfConsumers;++i)
            StartConsumer();
    }

    private void StartConsumer()
    {
       // Move this to constructor to prevent race condition in existing code (you could start multiple threads...
       Task.Factory.StartNew(this.spool, TaskCreationOptions.LongRunning);
    }

这将允许您启动任意数量的消费者。请注意,这违反了严格 FIFO 的规则 - 处理可能会处理带有此更改的块中的“numberOfConsumer”元素。

已经支持多个生产者。以上是线程安全的,因此任意数量的线程都可以并行调用Add(message),无需更改。

【讨论】:

  • 感谢您的反馈。您确定了一个竞争条件,您能否提供更多关于它如何启动 2 个线程的详细信息?
  • @MichaelRice 如果您想启动多个消费者,您可以将其作为构造函数参数传入:
  • @MichaelRice 已编辑以向您展示如何支持任意数量的消费者线程。
  • 我认为您误解了我的问题,或者我误解了您的回答。在您的代码注释中“//将此移动到构造函数以防止现有代码中的竞争条件(您可以启动多个线程......”您能否详细说明竞争条件是如何发生的?我没有看到它。谢谢!
  • @MichaelRice 哦 - 在你的代码中,你没有显示你设置 currentSpoolInstances 的位置 - 如果没有在锁内正确设置,你可能会打开一个竞争条件。
【解决方案2】:

我认为 Reed 的答案是最好的方法,但是为了学术,这里是一个使用并发队列的例子——你在你发布的代码中有一些比赛(取决于你如何处理递增的 currnetSpoolInstances )

我所做的更改(如下)是:

  • 切换到任务而不是线程(使用线程池而不是产生创建新线程的成本)
  • 添加了代码以增加/减少您的假脱机实例计数
  • 将“if currentSpoolInstances
  • 更改了处理空队列以避免竞争的方式:我认为您有竞争,您的 while 循环可能测试为 false,(您的线程开始退出),但在那一刻,添加了一个新项目(因此您的假脱机线程正在退出,但您的假脱机计数 > 0,因此您的队列停止)。
private ConcurrentQueue<IMyMessage> messages = new ConcurrentQueue<IMyMessage>(); const int maxSpoolInstances = 1; object lcurrentSpoolInstances = new object(); int currentSpoolInstances = 0; public void AddMessage(IMyMessage message) { this.messages.Enqueue(message); this.startSpool(); } private void startSpool() { lock (lcurrentSpoolInstances) { if (currentSpoolInstances < maxSpoolInstances) { this.currentSpoolInstances++; Task.Factory.StartNew(spool, TaskCreationOptions.LongRunning); } } } private void spool() { IMyMessage message; while (true) { // you do not need to null message because it is an "out" parameter, had it been a "ref" parameter, you would want to null it. if(this.messages.TryDequeue(out message)) { // My long running thing that does something with this message. } else { lock (lcurrentSpoolInstances) { if (this.messages.IsEmpty) { this.currentSpoolInstances--; return; } } } } }

【讨论】:

    【解决方案3】:

    检查“管道模式”:http://msdn.microsoft.com/en-us/library/ff963548.aspx

    • 对“缓冲区”使用 BlockingCollection。
    • 每个处理器(例如 ReadStrings、CorrectCase 等)都应在任务中运行。

    HTH..

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-09-23
      • 1970-01-01
      • 2010-10-12
      • 2017-02-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多