【问题标题】:modified producer consumer algorithm改进的生产者消费者算法
【发布时间】:2015-11-17 10:36:01
【问题描述】:

我正在处理修改的生产者/消费者问题。然而,有一个竞争条件,我正在讨论最好的解决方法。可能有更清洁的方法,我想知道是否有人做过类似的事情,如果可能的话,分享一个更好的解决方案。

它从使用队列的普通生产者/消费者开始。一个生产者线程从磁盘读取项目并在共享队列中排队。然后多个消费者线程尝试将项目出列以进行处理。然而,每个项目都有一个必须与消费者标签匹配的标签(如线程 ID)。消费者线程查看队列的前面并检查项目的标签。如果不匹配消费者线程的标签,消费者必须进入睡眠状态,等待队列的最前面有一个与其标签匹配的项目。有点混乱,但下面的伪代码有望阐明算法:

struct item
{
   // This is unique tag that only a specific consumer can consumer
    int    consumerTag; 
    // data for the consumer to consume
    void   *pData;
}

///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
    lock(queue)
    if (queueNotFull)
    {
        enqueue(item);
    }
    else
    {
       // check front of the queue, notify worker.
       Sleep(); // Releases Queue Mutex upon entering
       // requires the mutex after it has been awaken
    }
    unlock(queue);
    wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
    lock (queue);
    if (queueNotEmpty)
    {
        item = queueFront()
        if (myTag == item->id)
        {
           // this item is for me, let's dequeue and process
           item = dequeue();
           process();
        }
        else
        {
           // This is not for me let's go to sleep
           Sleep(); // Releases Queue Mutex
          // re-acquire mutex
        }
    }
    else
    {
        Sleep();    // Releases Queue Mutex
       // re-acquire mutex
    }

    unlock (queue);
    wakeUpProducer();
}

但是上面的算法有问题。让我们考虑以下事件并假设:

item.tag=1 表示该商品只能由具有相同标签的消费者消费。我将其表示为 consumer.tag = 1

  1. 生产者读取item.tag=1 并入队
  2. 生产者唤醒所有消费者线程(consumer.tag=1consumer.tag=2 等...现在都处于唤醒状态并检查队列的前端)
  3. 生产者读取item.tag=2 并入队
  4. 生产者唤醒所有消费者线程
  5. 队列现在有[item.tag=1, item.tag=2]
  6. consumer.tag=2 wakes up and peek at the front of the queue, butitem.tag=1 与 consumer.tag=1 不匹配;因此,它进入睡眠状态。 consumer.tag=2 现在正在睡觉。
  7. consumer.tag=1 醒来并偷看队列的前面,item.tag=1consumer.tag=1 匹配。出列并通知生产者它可以消耗更多。
  8. 生产者完成读取数据并退出。现在队列有item.tag=2consumer.tag=2 正在休眠并且从不消耗该数据。注意可以有很多消费者。所以最后许多消费者最终可能会进入睡眠和队列

我想在生产者线程的末尾添加一个循环,不断唤醒所有睡眠线程,直到队列为空。

// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
     WakeUpAllConsumer();
     Sleep();
}

但我相信一定有更优雅的方式来处理这个问题。有什么想法告诉我

谢谢!

【问题讨论】:

  • 将所有工作放在同一个队列上是没有意义的。因此,每个消费者都应该有自己的独立队列,并受条件变量保护。它将扩展得更好。
  • 忘了说,消费者处理数据的顺序必须和生产者从磁盘读取的顺序一致(是命令列表)。如果我使用多个队列,则处理项目的顺序将与保存在磁盘上的顺序不同。
  • 当前解决方案没有从并行性中受益,因为队列在处理过程中被锁定。我认为单一的生产者/单一的消费者是要走的路。

标签: multithreading algorithm producer-consumer


【解决方案1】:

不久前我遇到了类似的事情(在所有线程都可以处理所有项目的设置中),我在那里使用的解决方案,尽管不是那么优雅,最后一次是在制作人完成阅读时唤醒每个人数据。
在这里,这不会真正起作用,因为如果您的队列中有第三个项目,那么该项目可能会被抛在后面。我建议的是以下两种方法之一:

  1. 每次消费者出列项目时,唤醒所有线程。这是我能想到的唯一方法,以确保不会留下任何东西。 (这种模式只有isProducerFinishedReading == true才能做到,节省一个资源/时间。

  2. 将系统重新设计为有10个队列,然后当一个项目被添加到队列n时,消费者线程n被唤醒。当它处理完元素后,它会再次检查队列以查找要处理的新项目。在任何情况下,生产者都应该在读取完成后检查所有队列的长度并唤醒相应的线程。

希望有所帮助。

编辑:

  1. 每次线程完成工作时,它都应该再次检查队列,如果项目中有“his”,那么它会完成工作。如果一个线程可以唤醒其他线程,那么它应该唤醒相应的线程。

【讨论】:

  • 还有一个我没有提到的额外问题。读取和排队的数据必须以完全相同的顺序进行处理。我不能提前处理一些问题。队列中是一个命令列表,这些命令必须按照相同的顺序处理。
  • 很难解释。假设我所描述的一切都是必需的,它是复杂系统的一部分。我刚刚起草了核心问题。
  • 好的,接受了,我的回答能解决问题吗?每一个待处理的项目在上一个处理完之后仍然会被处理。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多