【发布时间】:2019-12-19 17:56:46
【问题描述】:
生产者在每个推入队列后,通过 conditionVar.notify_one() 向消费者发出信号。
然而,消费者在一些随机数的推送后醒来(因此随后的 notify_one()s)发生,有时是 21,有时是 2198,等等。 在生产者中插入延迟(sleep_for() 或 yield())没有帮助。
我怎样才能让这个 SPSC 同步运行?
如何将此示例扩展到多个消费者(即 SPMC)?
void singleProducerSingleConsumer() {
std::condition_variable conditionVar;
std::mutex mtx;
std::queue<int64_t> messageQueue;
bool stopped = false;
const std::size_t workSize = 4096;
std::function<void(int64_t)> producerLambda = [&](int64_t id) {
// Prepare a random number generator and push to the queue
std::default_random_engine randomNumberGen{};
std::uniform_int_distribution<int64_t> uniformDistribution{};
for (auto count = 0; count < workSize; count++){
//Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
std::lock_guard<std::mutex> lockGuard{ mtx };
//Push a random number onto the queue
messageQueue.push(uniformDistribution(randomNumberGen));
//Notify the consumer
conditionVar.notify_one();
//std::this_thread::yield();
/*std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Producer woke " << std::endl;*/
}
//Production finished
//Acquire the lock, set the stopped flag, inform the consumer
std::lock_guard<std::mutex> lockGuard {mtx };
std::cout << "Producer is done!" << std::endl;
stopped = true;
conditionVar.notify_one();
};
std::function<void(int64_t)> consumerLambda = [&](int64_t id) {
do {
std::unique_lock<std::mutex> uniqueLock{ mtx };
//Acquire the lock only if stopped or the queue isn't empty
conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });
//This thread owns the mutex here; pop the queue until it is empty
std::cout << "Consumer received " << messageQueue.size() << " items" << std::endl;
while (!messageQueue.empty()) {
const auto val = messageQueue.front(); messageQueue.pop();
std::cout << "Consumer obtained: " << val << std::endl;
}
uniqueLock.unlock();
if (stopped) {
//Producer has signaled a stop
std::cout << "Consumer is done!" << std::endl;
break;
}
} while (true);
};
std::thread consumer{ consumerLambda, 1 };
std::thread producer{ producerLambda, 2 };
consumer.join();
producer.join();
std::cout << "singleProducerSingleConsumer() finished" << std::endl;
}
【问题讨论】:
-
在你的
for循环中没有延迟当你仍然持有互斥锁时可以帮助你的消费者,他们正在耐心地等待同一个互斥锁。将您的yield移动到lockGuard收购之前,您可能会看到“更公平”的行为(更多“连续”和更少“批量”)。我愿意。
标签: multithreading mutex producer-consumer condition-variable thread-synchronization