【问题标题】:In a SPSC framework condition_variable.notify_one() is not signalling consistently在 SPSC 框架中,condition_variable.notify_one() 信号不一致
【发布时间】:2019-12-19 17:56:46
【问题描述】:

生产者在每个推入队列后,通过 conditionVar.notify_one() 向消费者发出信号。

然而,消费者在一些随机数的推送后醒来(因此随后的 notify_one()s)发生,有时是 21,有时是 2198,等等。 在生产者中插入延迟(sleep_for() 或 yield())没有帮助。

我怎样才能让这个 SPSC 同步运行?

如何将此示例扩展到多个消费者(即 SPMC)?

void singleProducerSingleConsumer() {
    std::condition_variable conditionVar;
    std::mutex mtx;
    std::queue<int64_t> messageQueue;
    bool stopped = false;
    const std::size_t workSize = 4096;

    std::function<void(int64_t)> producerLambda = [&](int64_t id) {
        // Prepare a random number generator and push to the queue
        std::default_random_engine randomNumberGen{};
        std::uniform_int_distribution<int64_t> uniformDistribution{};

        for (auto count = 0; count < workSize; count++){
            //Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
            std::lock_guard<std::mutex> lockGuard{ mtx };

            //Push a random number onto the queue
            messageQueue.push(uniformDistribution(randomNumberGen));

            //Notify the consumer
            conditionVar.notify_one();
            //std::this_thread::yield();
            /*std::this_thread::sleep_for(std::chrono::seconds(2));
            std::cout << "Producer woke " << std::endl;*/
        }
        //Production finished
        //Acquire the lock, set the stopped flag, inform the consumer
        std::lock_guard<std::mutex> lockGuard {mtx };

        std::cout << "Producer is done!" << std::endl;

        stopped = true;
        conditionVar.notify_one();
    };

    std::function<void(int64_t)> consumerLambda = [&](int64_t id) {

        do {
            std::unique_lock<std::mutex> uniqueLock{ mtx };
            //Acquire the lock only if stopped or the queue isn't empty
            conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });

            //This thread owns the mutex here; pop the queue until it is empty
            std::cout << "Consumer received " << messageQueue.size() << " items" << std::endl;
            while (!messageQueue.empty()) {
                const auto val = messageQueue.front(); messageQueue.pop();
                std::cout << "Consumer obtained: " << val << std::endl;
            }
            uniqueLock.unlock();

            if (stopped) {
                //Producer has signaled a stop
                std::cout << "Consumer is done!" << std::endl;
                break;
            }

        } while (true);
    };

    std::thread consumer{ consumerLambda, 1 };
    std::thread producer{ producerLambda, 2 };
    consumer.join();
    producer.join();

    std::cout << "singleProducerSingleConsumer() finished" << std::endl;
}

【问题讨论】:

  • 在你的for 循环中没有延迟当你仍然持有互斥锁时可以帮助你的消费者,他们正在耐心地等待同一个互斥锁。将您的yield 移动到lockGuard 收购之前,您可能会看到“更公平”的行为(更多“连续”和更少“批量”)。我愿意。

标签: multithreading mutex producer-consumer condition-variable thread-synchronization


【解决方案1】:

如果我理解正确,您希望在生产者生产下一个数字之前消耗生产者生产的每个数字。那基本上是顺序执行,而不是并发执行。您可以通过简单地让生产者在正常函数调用中将值传递给消费者来最有效地完成顺序执行。结果不是生产者消费者模式的一个很好的例子。

线程由操作系统调度,与同时在您的计算机上执行的所有其他线程和进程竞争。您的生产者和消费者线程可能在同一个 cpu 内核上运行,这意味着它们必须按照操作系统的调度轮流执行。由于消费者在生产者写入数据之前无法消费数据,因此生产者将在其第一个执行窗口期间使用多个值填充消息队列才有意义,然后消费者将有时间消费消息队列中的值。因此,messageQueue 将分批填充和取消填充,直到程序完成。

您的解决方案应该向上扩展以处理多个消费者。

【讨论】:

  • 感谢您的意见。此代码将演变为 ASIC 模拟器;生产者会将工作分派给不同的子系统(消费者线程)。
猜你喜欢
  • 1970-01-01
  • 2015-04-21
  • 2015-02-27
  • 1970-01-01
  • 2016-08-06
  • 2013-03-18
  • 1970-01-01
  • 2018-06-07
  • 1970-01-01
相关资源
最近更新 更多