代码都类似,看懂一个,基本都能理解了。
共有代码:
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; // Item buffer size.
static const int kItemsToProduce = 1000; // How many items we plan to produce.
std::mutex mutex;//多线程标准输出同步锁
单生产者单消费者模式:
1 struct ItemRepository 2 { 3 int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列. 4 size_t read_position; // 消费者读取产品位置. 5 size_t write_position; // 生产者写入产品位置. 6 std::mutex mtx; // 互斥量,保护产品缓冲区 7 std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满. 8 std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空. 9 } gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量. 10 11 typedef struct ItemRepository ItemRepository; 12 13 void ProduceItem(ItemRepository * ir, int item) 14 { 15 std::unique_lock<std::mutex> lock(ir->mtx); 16 while (((ir->write_position + 1) % kItemRepositorySize) 17 == ir->read_position) 18 { // item buffer is full, just wait here. 19 { 20 std::lock_guard<std::mutex> lock(mutex); 21 std::cout << "缓冲区满,等待缓冲区不满\n"; 22 } 23 (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生. 24 } 25 26 (ir->item_buffer)[ir->write_position] = item; // 写入产品. 27 (ir->write_position)++; // 写入位置后移. 28 29 if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置. 30 ir->write_position = 0; 31 32 (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空. 33 lock.unlock(); // 解锁. 34 } 35 36 int ConsumeItem(ItemRepository *ir) 37 { 38 int data; 39 std::unique_lock<std::mutex> lock(ir->mtx); 40 // item buffer is empty, just wait here. 41 while (ir->write_position == ir->read_position) 42 { 43 { 44 std::lock_guard<std::mutex> lock(mutex); 45 std::cout << "缓冲区空,等待生产者生成产品\n"; 46 } 47 (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生. 48 } 49 50 data = (ir->item_buffer)[ir->read_position]; // 读取某一产品 51 (ir->read_position)++; // 读取位置后移 52 53 if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位. 54 ir->read_position = 0; 55 56 (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满. 57 lock.unlock(); // 解锁. 58 59 return data; // 返回产品. 60 } 61 62 63 void ProducerTask() // 生产者任务 64 { 65 for (int i = 1; i <= kItemsToProduce; ++i) 66 { 67 // sleep(1); 68 ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品. 69 { 70 std::lock_guard<std::mutex> lock(mutex); 71 std::cout << "生产第 " << i << "个产品" << std::endl; 72 } 73 } 74 } 75 76 void ConsumerTask() // 消费者任务 77 { 78 static int cnt = 0; 79 while (1) 80 { 81 std::this_thread::sleep_for(std::chrono::seconds(1)); 82 int item = ConsumeItem(&gItemRepository); // 消费一个产品. 83 { 84 std::lock_guard<std::mutex> lock(mutex); 85 std::cout << "消费第" << item << "个产品" << std::endl; 86 } 87 if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出. 88 } 89 } 90 91 void InitItemRepository(ItemRepository *ir) 92 { 93 ir->write_position = 0; // 初始化产品写入位置. 94 ir->read_position = 0; // 初始化产品读取位置. 95 } 96 97 void test() 98 { 99 InitItemRepository(&gItemRepository); 100 std::thread producer(ProducerTask); // 创建生产者线程. 101 std::thread consumer(ConsumerTask); // 创建消费之线程. 102 103 producer.join(); 104 consumer.join(); 105 }