【问题标题】:C++ wait notify in threads with synchronized queues具有同步队列的线程中的 C++ 等待通知
【发布时间】:2019-11-22 08:58:37
【问题描述】:

我有一个这样的程序结构:一个线程接收任务并将它们写入输入队列,多个线程处理它们并写入输出队列,一个响应结果。当队列为空时,线程会休眠几毫秒。队列里面有互斥体,push 做 lock(),popping 做 try_lock(),如果队列中没有任何内容则返回。

这是处理线程例如:

//working - atomic bool
while (working) {
    if (!inputQue_->pop(msg)) {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        continue;
    } else {
        string reply = messageHandler_->handle(msg);
        if (!reply.empty()) {
            outputQue_->push(reply);
        }
    }
}

我不喜欢的是,从收到任务到响应的时间,正如我用 high_resolution_clock 测量的那样,几乎是 0,此时没有睡眠。当有睡眠时,它会变得更大。 我不希望浪费 cpu 资源并想做这样的事情:当接收线程获取任务时,它通知一个处理线程,它执行 wait_for,当处理任务完成时,它以相同的方式通知响应线程。因此,我认为我会花费更少的时间,并且不会浪费 cpu 资源。我有一些问题:

  1. 这会按我认为的方式工作吗,唯一的区别是在通知时醒来?
  2. 为此,我必须创建 2 个条件变量:第一个相同用于接收线程和所有处理,第二个相同用于所有处理和响应?处理线程中的互斥锁必须对所有线程都是通用的还是唯一的?
  3. 我可以在 if 分支中创建 unique_lock(mutex) 和 wait_for() 来代替 sleep_for 吗?
  4. 如果某些处理线程很忙,是否可能 notify_one() 可以尝试唤醒其中一个,但不能唤醒空闲线程?我需要使用 notify_all() 吗?
  5. 是否有可能 notify 不会唤醒任何线程?如果有,概率大吗?

【问题讨论】:

  • 你有没有想过使用带有轮询机制的消息队列而不是调用std::this_thread::sleep_for?这样,您的线程仅在有要处理的事件时才会唤醒。一个伟大的消息队列的例子是ZeroMQ
  • std::condition_variable 的方法对我来说似乎是合理的。生产者必须锁定/修改/解锁/通知,消费者必须锁定/等待/解锁。如果你有一个线程“管道”,那么中间的线程既是消费者也是生产者。因此,它必须同时实现。
  • 如果某些处理线程很忙,notify_one() 是否可以尝试唤醒其中一个 :-) 一个忙线程不在队列中操作系统选择一个唤醒。我没有看到这个问题。另一个话题,我曾经偶然发现:如果所有消费者都很忙,通知会发生什么?我得出的结论是,通知只是“无处可去”,但不会造成任何伤害。并且不要忘记可能会有“spurios 唤醒”......
  • 是否有可能notify 不会唤醒任何线程? 如果有等待线程,notify 应该唤醒一个或全部。如果不是这样,整个condition_variable 事情就会被打破,恕我直言。我假设condition_variable 是建立在信号量之上的。如果它们不能按预期工作,我会认真考虑切换到另一个操作系统。 ;-)

标签: c++ multithreading


【解决方案1】:

这会按照我认为的方式工作吗,唯一的区别是在通知时醒来?

是的,假设你做得对。

为此,我必须创建 2 个条件变量:第一个用于接收线程和所有处理,第二个用于所有处理和响应?并且处理线程中的互斥锁必须对所有线程都是通用的还是唯一的?

您可以使用单个互斥锁和单个条件变量,但这会使其复杂一些。我建议使用单个互斥体,但线程可能要等待的每个条件都有一个条件变量。

我可以在 if 分支中创建 unique_lock(mutex) 和 wait_for() 来代替 sleep_for 吗?

绝对不是。您需要在检查队列是否为空时持有互斥锁并继续持有它,直到您调用wait_for。否则,您将破坏条件变量的整个逻辑。与条件变量关联的互斥锁必须保护线程将要等待的条件,在这种情况下,队列是非空的。

如果某些处理线程很忙,是否可能 notify_one() 可以尝试唤醒其中一个,但不能唤醒空闲线程?我需要使用 notify_all() 吗?

我不知道您所说的“免费线程”是什么意思。作为一般规则,如果无法在无法处理条件的条件变量上阻塞线程,则可以使用notify_one。如果可能需要唤醒多个线程,或者可能会在条件变量上阻塞多个线程并且可能唤醒“错误线程”,则应该使用notify_all,也就是说,可能在至少有一个线程不能做任何需要做的事情。

是否有可能 notify 不会唤醒任何线程?如果有,概率大吗?

当然,这很有可能。但这意味着在这种情况下没有线程被阻塞。在这种情况下,没有线程可以阻塞条件,因为线程必须在wait 之前检查条件,并且它们在持有互斥锁的同时执行此操作。提供这种原子的“解锁和等待”语义是条件变量的全部目的。

【讨论】:

    【解决方案2】:

    您拥有的机制称为轮询。线程反复检查(轮询)是否有可用数据。正如你提到的,它有浪费时间的缺点。 (但这很简单)。你提到的你想使用的东西叫做阻塞机制。这会取消线程调度,直到工作可用为止。

    1) 是的(虽然我不知道你在想什么)

    2) a) 是的,2 个条件变量是一种方法。 b) 普通互斥锁是最好的

    3) 您可能会将它们放在pop 中,而调用pop 可能会被阻止。

    4) 不会。notify_one 只会唤醒当前正在等待调用 wait 的线程。此外,如果有多个正在等待,则不一定能保证哪个会收到通知。 (操作系统/库依赖)

    5) 不。如果有 1+ 个线程在等待,notify_one 保证会唤醒一个。但是如果没有线程在等待,则通知被消耗(并且没有效果)。请注意,在某些边缘条件下,notify_one 实际上可能唤醒不止一个。此外,线程可能会从wait 唤醒,而无需任何人调用notify_one(“虚假唤醒”)。这种可能发生的事实意味着您总是需要对其进行额外的检查。

    顺便说一句,这称为生产者/消费者问题。

    【讨论】:

      【解决方案3】:

      一般来说,您对条件变量的考虑是正确的。我的建议更多地与此类功能的设计和可重用性相关。 主要思想是实现 ThreadPool 模式,它具有带有工作线程数的构造函数,方法 submitTask,shutdown,join。 拥有这样的类,您将使用 2 个池实例:一个用于处理多线程,第二个(由您选择单线程)用于结果发送。 该池由任务阻塞队列和工作线程数组组成,每个执行相同的“弹出任务并运行”循环。阻塞队列封装了互斥体和cond_var。 Task 是通用函子。 这也将您的设计带入了面向任务的方法,这在您的应用程序的未来有很多优势。 如果您喜欢这个想法,欢迎您提出有关实施细节的更多问题。 最好的问候,丹尼尔

      【讨论】:

        猜你喜欢
        • 2012-02-22
        • 1970-01-01
        • 2015-11-06
        • 2011-02-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多