【问题标题】:Producer Consumer problem using mutexes in cpp在 cpp 中使用互斥锁的生产者消费者问题
【发布时间】:2020-10-15 04:00:18
【问题描述】:

我有一个生产者和两个消费者线程试图访问共享缓冲区。消费者和生产者之间使用互斥锁。消费者应该并行运行。如果缓冲区为空,则消费者休眠,生产者必须唤醒它们。如果缓冲区已满,生产者不做任何事情。下面是我正在处理的代码 sn-ps: 生产者线程:

void *writer(void*)

{
     // Initialising the seed
         srand(time(NULL));
     while(1)
     {
         pthread_mutex_lock(&rallow);
         if (Q.size() < MAX && item < MAX)
         {
            // Getting the random number
            int num = rand() % 10 + 1;
            // Pushing the number into queue
            Q.push(num);
            
            item++;
            cout << "Produced: " << num << " item: "<<item<<endl;
            pthread_cond_broadcast(&dataNotProduced); 
         }
         else if (item == MAX) {
            pthread_mutex_unlock(&rallow);
            continue;
        }
        pthread_mutex_unlock(&rallow);
    }
}

消费者 1:

void *reader1(void*)

{
    while(1)
    {
         pthread_mutex_lock(&mread);

         rc++;

         if(rc==1)
            pthread_mutex_lock(&rallow);

         pthread_mutex_unlock(&mread);


         if (Q.size() > 0) {
            // Get the data from the front of queue
            int data = Q.front();
      
            // Pop the consumed data from queue
            Q.pop();

            item--;
            cout << "B thread consumed: " << data <<endl;


            pthread_cond_signal(&dataNotConsumed);
        }
        else
        {
            cout << "B is in wait.." << endl;
            pthread_cond_wait(&dataNotProduced, &rallow);
            cout<<"B woke up"<<endl;
        }

         pthread_mutex_lock(&mread);

         rc--;

         if(rc==0)
            pthread_mutex_unlock(&rallow);

         pthread_mutex_unlock(&mread);
        sleep(1);

    } 
}

消费者2:


void *reader2(void*)

{
    while(1)
    {
         pthread_mutex_lock(&mread);

         rc++;

         if(rc==1)
            pthread_mutex_lock(&rallow);

         pthread_mutex_unlock(&mread);


         if (Q.size() > 0) {
            // Get the data from the front of queue
            int data = Q.front();
  
            // Pop the consumed data from queue
            Q.pop();

            item--;
            cout << "C thread consumed: " << data <<endl;
            pthread_cond_signal(&dataNotConsumed);
        }
        else
        {
            cout << "C is in wait.." << endl;
            pthread_cond_wait(&dataNotProduced, &rallow);
            cout<<"C woke up"<<endl;
        }

         pthread_mutex_lock(&mread);

         rc--;

         if(rc==0)
            pthread_mutex_unlock(&rallow);

         pthread_mutex_unlock(&mread);
        sleep(1);

    }
}

输出看起来像这样:

C is in wait..
B is in wait..
Produced: 8 item: 1
Produced: 4 item: 2
Produced: 2 item: 3
Produced: 4 item: 4
Produced: 2 item: 5
Produced: 8 item: 6
Produced: 5 item: 7
Produced: 2 item: 8
Produced: 10 item: 9
Produced: 3 item: 10
>> Producer is in wait..
B woke up
B thread consumed: 8
B thread consumed: 4
B thread consumed: 2
B thread consumed: 4
B thread consumed: 2
B thread consumed: 8
B thread consumed: 5
B thread consumed: 2
B thread consumed: 10
B thread consumed: 3
B is in wait..
C woke up
C is in wait..
Producer woke up

我怀疑为什么线程 B 和 C 没有显示并行执行。以及为什么生产者一次将值填充到缓冲区 10 中,而不是提供少量,然后消费者消费它,然后再次产生少量。任何线索将不胜感激。

【问题讨论】:

  • 考虑操作系统为每个线程分配时间片。生产者可以在其中一个消费者获得时间片之前将多个项目放入队列中。类似地,单个消费者可能会在另一个消费者有机会之前将多个项目出列。即使每个线程都在单独的内核上运行,也不能保证它们都以完全相同的速度运行。
  • 顺便说一句,如果您有两个几乎相同的函数,请考虑将公共部分分解为一个两个都可以调用的函数。另外,您正在编译哪个 c++ 版本?从c++11开始就有std::thread,不需要联系那些可怕的void*s
  • @G.Sliepen 我明白你所说的。但是看看输出程序给了我,我看到一旦一个线程锁定到互斥锁,它也会在接下来的几次中获得互斥锁。当前线程在传递给任何其他线程之前获得了显着的互斥量。我该如何克服这个问题?

标签: c++ multithreading mutex


【解决方案1】:
     else if (item == MAX) {
        pthread_mutex_unlock(&rallow);
        cout << ">> Producer is in wait.." << endl;
        pthread_cond_wait(&dataNotConsumed, &rallow);

您解锁互斥锁,然后等待。你不能那样做。这会创建一个窗口,在此窗口中,您等待的事情可能发生在您等待之前。您必须在持有互斥锁的同时调用pthread_cond_wait,以确保您等待的事情不会在您决定等待之后但在您开始等待之前发生。

您的消费者中有另一个巨大的错误。一个线程可以锁定rallow,然后另一个线程可以尝试解锁它。这是不允许的——获取互斥锁的线程必须是释放它的线程。您不需要两个互斥锁 - 只需使用一个保护所有状态的互斥锁即可。

【讨论】:

  • 我的要求是如果生产者被锁定,那么任何消费者都无法访问缓冲区。但是,如果一个消费者被锁定,那么所有消费者都可以并行读取缓冲区。我对您指出为代码的上述部分进行了小修改。请告诉我你的想法
  • This article 可能会有所帮助。使用条件变量来等待所有读者完成后再写入。
【解决方案2】:

首先,不能保证所有线程都会一直并发运行。如果它们在单核上运行,操作系统会给每个线程几十毫秒的时间片。如果它们在不同的内核上运行,那么在一个调用pthread_cond_broadcast() 的线程和另一个从pthread_cond_wait() 唤醒的线程之间存在延迟。这很容易解释编写器线程能够在另一个线程唤醒之前将 10 个项目推送到队列。

下一个问题是,为什么 B 消耗了所有物品,而 C 什么也得不到?问题是因为这个:

pthread_mutex_lock(&mread);

rc++;

if(rc == 1)
    pthread_mutex_lock(&rallow);

pthread_mutex_unlock(&mread);

考虑线程 B 和 C 一个接一个地执行这个块。两者都可以锁定mread,两者都会增加rc,但只有一个会锁定rallow。接下来发生的事情是不确定的,因为他们都试图访问队列,即使其中一个不会持有锁。

应该没有必要有两个互斥锁。两个消费者线程都应该无条件锁定rallow,检查队列中是否有东西,如果没有则调用pthread_cond_wait()

由于您使用的是 C++,因此您应该真正使用 C++11 的线程支持,而不是使用 C pthread 函数。您的代码应如下所示:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex rallow;
std::condition_variable dataProduced;
std::condition_variable dataConsumed;

void writer() {
    while(true) {
        // Generate the random number
        int num = rand() % 10 + 1;
        std::cout << "Produced: " << num << "\n";

        // Push it to the queue
        {
            std::lock_guard<std::mutex> lock(rallow);
            dataConsumed.wait(rallow, [](){return Q.size() < MAX;});
            Q.push(num);
        }
    }
}

void reader(int id) {
    while(true) {
        int data;

        // Pop an item from the queue
        {
            std::lock_guard<std::mutex> lock(rallow);
            dataProduced.wait(rallow, [](){return Q.size() > 0;});
            data = Q.front();
            Q.pop();
        }

        // Process the data
        std::cout << "Consumer thread " << id << " consumed: " << data << "\n";
    }
}

您甚至可以创建一个线程安全的队列类来处理互斥锁和条件变量本身,因此生产者和消费者代码将简化为:

void writer() {
    while(true) {
        int num = rand() % 10 + 1;
        std::cout << "Produced: " << num << "\n";
        Q.push(num);
    }
}

void reader(int id) {
    while(true) {
        int data = Q.pop();
        std::cout << "Consumer thread " << id << " consumed: " << data << "\n";
    }
}

【讨论】:

  • 这段代码被设计为生产者或消费者可以一次访问队列。当消费者访问时,两个消费者都可以并行访问队列(如生产者-消费者问题)。这就是为什么只有一个消费者必须获得锁。但是如果一个消费者获得了锁,那么所有消费者可以并行地从队列中读取。您编写的代码看起来一次只能访问 1 个线程,这不是我的要求。请让我知道你的想法。 @G.Sliepen
  • 我编写的代码序列化了对队列的访问,但允许并发处理从队列中弹出的数据。在您的代码中,当多个线程同时尝试 push 或 pop 时,如何确保队列结构本身不会损坏?
猜你喜欢
  • 2014-03-26
  • 1970-01-01
  • 2020-01-21
  • 2015-09-25
  • 1970-01-01
  • 1970-01-01
  • 2019-05-18
  • 2021-12-26
  • 1970-01-01
相关资源
最近更新 更多