【发布时间】:2015-11-17 10:36:01
【问题描述】:
我正在处理修改的生产者/消费者问题。然而,有一个竞争条件,我正在讨论最好的解决方法。可能有更清洁的方法,我想知道是否有人做过类似的事情,如果可能的话,分享一个更好的解决方案。
它从使用队列的普通生产者/消费者开始。一个生产者线程从磁盘读取项目并在共享队列中排队。然后多个消费者线程尝试将项目出列以进行处理。然而,每个项目都有一个必须与消费者标签匹配的标签(如线程 ID)。消费者线程查看队列的前面并检查项目的标签。如果不匹配消费者线程的标签,消费者必须进入睡眠状态,等待队列的最前面有一个与其标签匹配的项目。有点混乱,但下面的伪代码有望阐明算法:
struct item
{
// This is unique tag that only a specific consumer can consumer
int consumerTag;
// data for the consumer to consume
void *pData;
}
///////////////////////////////
// PRODUCER THREAD -> only 1
///////////////////////////////
// producer reads items
// each item has a tag to a specific consumer
while (item = read())
{
lock(queue)
if (queueNotFull)
{
enqueue(item);
}
else
{
// check front of the queue, notify worker.
Sleep(); // Releases Queue Mutex upon entering
// requires the mutex after it has been awaken
}
unlock(queue);
wakeUpAllConsumers();
}
-------------------------------------------------------
///////////////////////////////
// CONSUMER THREAD -> many threads
///////////////////////////////
// my tag is it like at thread id,
// each consumer has a unique tag
myTag = getThreadTAG()
while (true)
{
lock (queue);
if (queueNotEmpty)
{
item = queueFront()
if (myTag == item->id)
{
// this item is for me, let's dequeue and process
item = dequeue();
process();
}
else
{
// This is not for me let's go to sleep
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
}
else
{
Sleep(); // Releases Queue Mutex
// re-acquire mutex
}
unlock (queue);
wakeUpProducer();
}
但是上面的算法有问题。让我们考虑以下事件并假设:
item.tag=1 表示该商品只能由具有相同标签的消费者消费。我将其表示为 consumer.tag = 1
- 生产者读取
item.tag=1并入队 - 生产者唤醒所有消费者线程(
consumer.tag=1、consumer.tag=2等...现在都处于唤醒状态并检查队列的前端) - 生产者读取
item.tag=2并入队 - 生产者唤醒所有消费者线程
- 队列现在有
[item.tag=1, item.tag=2] -
consumer.tag=2 wakes up and peek at the front of the queue, butitem.tag=1 与consumer.tag=1不匹配;因此,它进入睡眠状态。consumer.tag=2现在正在睡觉。 -
consumer.tag=1醒来并偷看队列的前面,item.tag=1与consumer.tag=1匹配。出列并通知生产者它可以消耗更多。 - 生产者完成读取数据并退出。现在队列有
item.tag=2和consumer.tag=2正在休眠并且从不消耗该数据。注意可以有很多消费者。所以最后许多消费者最终可能会进入睡眠和队列
我想在生产者线程的末尾添加一个循环,不断唤醒所有睡眠线程,直到队列为空。
// PRODUCER THREAD
// Process the rest
while (queueIsNotEmpty)
{
WakeUpAllConsumer();
Sleep();
}
但我相信一定有更优雅的方式来处理这个问题。有什么想法告诉我
谢谢!
【问题讨论】:
-
将所有工作放在同一个队列上是没有意义的。因此,每个消费者都应该有自己的独立队列,并受条件变量保护。它将扩展得更好。
-
忘了说,消费者处理数据的顺序必须和生产者从磁盘读取的顺序一致(是命令列表)。如果我使用多个队列,则处理项目的顺序将与保存在磁盘上的顺序不同。
-
当前解决方案没有从并行性中受益,因为队列在处理过程中被锁定。我认为单一的生产者/单一的消费者是要走的路。
标签: multithreading algorithm producer-consumer