代码都类似,看懂一个,基本都能理解了。

共有代码:

#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

static const int kItemRepositorySize = 10; // Item buffer size.
static const int kItemsToProduce = 1000;   // How many items we plan to produce.
std::mutex mutex;//多线程标准输出同步锁

单生产者单消费者模式:

  1 struct ItemRepository
  2     {
  3         int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
  4         size_t read_position; // 消费者读取产品位置.
  5         size_t write_position; // 生产者写入产品位置.
  6         std::mutex mtx; // 互斥量,保护产品缓冲区
  7         std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
  8         std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
  9     } gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.
 10 
 11     typedef struct ItemRepository ItemRepository;
 12 
 13     void ProduceItem(ItemRepository * ir, int item)
 14     {
 15         std::unique_lock<std::mutex> lock(ir->mtx);
 16         while (((ir->write_position + 1) % kItemRepositorySize)
 17             == ir->read_position)
 18         { // item buffer is full, just wait here.
 19             {
 20                 std::lock_guard<std::mutex> lock(mutex);
 21                 std::cout << "缓冲区满,等待缓冲区不满\n";
 22             }
 23             (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
 24         }
 25 
 26         (ir->item_buffer)[ir->write_position] = item; // 写入产品.
 27         (ir->write_position)++; // 写入位置后移.
 28 
 29         if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
 30             ir->write_position = 0;
 31 
 32         (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
 33         lock.unlock(); // 解锁.
 34     }
 35 
 36     int ConsumeItem(ItemRepository *ir)
 37     {
 38         int data;
 39         std::unique_lock<std::mutex> lock(ir->mtx);
 40         // item buffer is empty, just wait here.
 41         while (ir->write_position == ir->read_position)
 42         {
 43             {
 44                 std::lock_guard<std::mutex> lock(mutex);
 45                 std::cout << "缓冲区空,等待生产者生成产品\n";
 46             }
 47             (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
 48         }
 49 
 50         data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
 51         (ir->read_position)++; // 读取位置后移
 52 
 53         if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
 54             ir->read_position = 0;
 55 
 56         (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
 57         lock.unlock(); // 解锁.
 58 
 59         return data; // 返回产品.
 60     }
 61 
 62 
 63     void ProducerTask() // 生产者任务
 64     {
 65         for (int i = 1; i <= kItemsToProduce; ++i)
 66         {
 67             // sleep(1);
 68             ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
 69             {
 70                 std::lock_guard<std::mutex> lock(mutex);
 71                 std::cout << "生产第 " << i << "个产品" << std::endl;
 72             }
 73         }
 74     }
 75 
 76     void ConsumerTask() // 消费者任务
 77     {
 78         static int cnt = 0;
 79         while (1)
 80         {
 81             std::this_thread::sleep_for(std::chrono::seconds(1));
 82             int item = ConsumeItem(&gItemRepository); // 消费一个产品.
 83             {
 84                 std::lock_guard<std::mutex> lock(mutex);
 85                 std::cout << "消费第" << item << "个产品" << std::endl;
 86             }
 87             if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
 88         }
 89     }
 90 
 91     void InitItemRepository(ItemRepository *ir)
 92     {
 93         ir->write_position = 0; // 初始化产品写入位置.
 94         ir->read_position = 0; // 初始化产品读取位置.
 95     }
 96 
 97     void test()
 98     {
 99         InitItemRepository(&gItemRepository);
100         std::thread producer(ProducerTask); // 创建生产者线程.
101         std::thread consumer(ConsumerTask); // 创建消费之线程.
102 
103         producer.join();
104         consumer.join();
105     }
View Code

相关文章: