【问题标题】:Multiple producer-consumer problem sticks on last consume多个生产者 - 消费者问题坚持最后一次消费
【发布时间】:2021-05-21 09:53:46
【问题描述】:

我正在尝试使用 pthreads 和信号量解决多生产者-消费者问题,但它总是停留在最后一次消耗和停止。 它将有 NO_ITEMS 个项目并假设缓冲区的大小为 BUFFER_SIZE

下面是我当前的代码。

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100

using namespace std;

void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();

sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;

stack<int> items;
static int count = 0;



int main()
{
    sem_init(&fillCount, 0, 0);
    sem_init(&emptyCount, 0, BUFFER_SIZE);
    pthread_mutex_init(&mutex, nullptr);
    pthread_t p1, c1, c2, c3;

    pthread_create(&p1, nullptr, thread_producer, nullptr);
    pthread_create(&c1, nullptr, thread_consumer, nullptr);
    pthread_create(&c2, nullptr, thread_consumer, nullptr);
    pthread_create(&c3, nullptr, thread_consumer, nullptr);

    pthread_join(p1, nullptr);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);

    sem_destroy(&fillCount);
    sem_destroy(&emptyCount);
    pthread_mutex_destroy(&mutex);

    return 0;
}

void* thread_consumer(void* args) {

    while (count < NO_ITEMS) {
        sem_wait(&fillCount);
        pthread_mutex_lock(&mutex);

        if (!items.empty() && count < NO_ITEMS - 1) {
            removeItem();
        }

        count++;
        pthread_mutex_unlock(&mutex);
        sem_post(&emptyCount);
    }

    return nullptr;
}

void* thread_producer(void* args) {
    for (int i = 0; i < NO_ITEMS; i++) {
        sem_wait(&emptyCount);
        pthread_mutex_lock(&mutex);

        addItem(i);
        // sleep(1);

        pthread_mutex_unlock(&mutex);
        sem_post(&fillCount);
    }

    return nullptr;

}

void addItem(int i) {
    cout << "Produced: " << i << endl;
    items.push(i);
}

void removeItem() {
    cout << "Consumed: " << items.top() << endl;
    items.pop();

}

这是输出的一部分:

Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt

【问题讨论】:

  • 你必须使用平台特定的 C 库 pthread 吗?如果您可以使用 C++11 或更高版本,则您拥有标准 C++ 线程对象和支持类。
  • @TedLyngmo 是的,我必须使用 c pthread 库
  • 即使你不调用removeItem(),为什么还要递增count?你为什么要检查countNO_ITEMS - 1
  • @AVH 我试图模仿wiki的java代码,我已经更正了计数增量和计数检查逻辑。它仍然停止。

标签: c++ pthreads semaphore producer-consumer


【解决方案1】:

有缺陷的逻辑

您的代码存在逻辑问题。假设NO_ITEMS 是 100,到目前为止已经消耗了 99。让两个消费者线程在该点到达while 循环的顶部,并假设两者都将count 读取为99(但见下文),因此进入循环体。两个消费者都会阻塞sem_wait(),但最多还有一个项目要生产,所以生产者最多会再增加一次信号量,让至少一个消费者无限期地阻塞。

未定义的行为

此外,您的 thread_consumer() 函数包含数据竞争,导致程序的行为未定义。具体来说,while条件下共享变量count的读取没有正确同步。尽管人们无法可靠地预测 UB 将如何表现(否则它不会是“未定义的”),但非同步访问显示一个线程的明显失败以查看其他线程的共享变量更新是相当普遍的。这种故障模式将完全解释您观察到的特定行为。

很可能,对这个同步问题的正确修复也可以解决逻辑问题。

解决方案

有多种可能的解决方案。以下是一些有希望的:

  1. 信号量并不是特别适合解决这个问题。无论如何,您都需要一个互斥体,而它通常的信号对应物是条件变量。我会将两个信号量转换为两个(或者可能只是一个)普通整数变量,并在生产者和消费者中使用标准互斥锁 + CV 模式。这将包括为消费者中count 的读取添加互斥保护。

  2. 另一方面,如果你有义务使用信号量,那么你可以

    • 为消费者阅读count添加适当的互斥保护
    • 请务必保留消费者在成功减少信号量后是否可以实际消费商品的测试
    • 在加入生产者线程之后但在尝试加入消费者之前让主程序两次(消费者数量 - 1 次)发布到fillCount。这将取消阻止任何认为他们可以消费某项但最终仍在等待最后一项被另一个消费者消费后的消费者。
  3. 或者您可以使用混合:保留emptyCount 信号量以限制在任何给定时间等待的项目数量(而不是为此目的切换到 CV),但切换到互斥锁 + CV 模式来管理消费者。

【讨论】:

  • 感谢您的清晰解释和解决方案。我添加了一个互斥锁来锁定while循环,程序将成功结束。
猜你喜欢
  • 1970-01-01
  • 2019-09-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多