【问题标题】:Waking up all of the threads at once, without a mutex一次唤醒所有线程,无需互斥锁
【发布时间】:2012-12-14 20:19:30
【问题描述】:

我的程序根据行的字段之一在不同的线程中处理数据库行。 主线程产生“工人”,执行查询,然后,对于每一行,需要唤醒所有工人,以便其中一个消耗该行。

现在,使用 pthread_cond_broadcast() 似乎是最合乎逻辑的选择。但是,在这种情况下,工作人员必须都在 pthread_cond_wait() 中等待使用相同的互斥锁

在我的情况下这是次优的,因为这意味着,工人将被唤醒一次(我不需要)——而不是一次时间>。是的,我确实希望他们全部醒来——他们都会从新的 DB 行中读取一个字段,之后除了一个字段之外的所有字段都将返回等待下一行。我不需要同步它们。

我想,我会在每个线程中使用带有 pthread_cond_wait() 的虚拟线程特定互斥锁,但这不起作用(只唤醒了一个线程)。 standard 说,使用不同的互斥锁等待相同的条件变量(就像我一样)是未定义的。

那么,有没有办法一次通知所有线程?谢谢!

【问题讨论】:

  • 您是否试图让工作人员找出真正需要唤醒的人(基于行数据)?为什么不让主线程找出正确的工作线程并将数据排队呢?
  • 是的,工人们要弄清楚,他们中的哪一个会接受这份工作。我宁愿不在主线程中这样做,因为那样我需要循环比较,而工作线程只需要每个执行一个比较。
  • 你不能使用哈希表或二分搜索(而不是线性搜索)来找出收件人吗?无论如何,除非您拥有与 worker 一样多的内核,否则不会并行进行比较(并且由于上下文切换,这可能比循环还要慢)。
  • 是的,我可以使用哈希表或树——但这仍然比简单地让每个工作人员执行一次比较慢。我的计算方式是在 O(1) 时间内完成的。对于散列或树,它仍然是 O(log(n))。
  • "big-O" 是狡猾且具有误导性的。对于较小的 n 值,一个操作大约需要 30 个周期的 O(log(n)) 远远好于一个操作需要几千个周期的 O(1)。另请注意,您的 O(1) 实际上是“n 个不同线程/CPU 上的 O(1)”,这对我来说听起来很像 O(n)。

标签: c pthreads


【解决方案1】:

对于条件变量,假设有一些相关的“条件”(在您的情况下为数据行)需要专门检查和更新(因此是互斥锁)。无论您选择哪种其他机制,您都需要弄清楚如何确保对“工作队列”的独占访问(无论是单个插槽还是真正的队列)。

使用共享队列,数据结构将始终有 2 个写入者(主线程 + 预期工作线程)和 N-1 个读取者。您可以使用读写锁 (rwlock) 来确保完整性。

或者,您可以有 N 个单独的队列(每个工作人员一个)。您会将数据行的副本推送给每个工作人员。

就一次唤醒多个线程而言:您可以让您的工作人员“休眠”(例如使用 select())并使用 pthread_signal()(在循环中)唤醒它们。

你也可以使用pthread_barrier_wait()

pthread_barrier_wait() 函数应在屏障引用的屏障处同步参与线程。调用线程将阻塞,直到所需数量的线程调用了指定屏障的 pthread_barrier_wait()。

当所需数量的线程调用了指定屏障的 pthread_barrier_wait() 时,常量 PTHREAD_BARRIER_SERIAL_THREAD 应返回给一个未指定的线程,并且零应返回给每个剩余线程。此时,屏障应重置为最近引用它的 pthread_barrier_init() 函数的结果。

  1. 使用 pthread_barrier_init() 初始化屏障(计数 = 1 + 工人数)
  2. 在每个worker中,循环调用pthread_barrier_wait();当它返回时,新数据就准备好了
  3. 在主线程中,调用 pthread_barrier_wait() 向工作人员发出信号

不幸的是(如 OP 所述),在下一次迭代中,在先前激活的工作人员完成其工作之前,不会唤醒任何工作人员。

一个更简单的架构会让主线程将事件分派给适当的工作人员(而不是唤醒所有工作人员并让他们确定哪个是预期的接收者)。除非您拥有与 worker 一样多的内核,否则测试无论如何都不会真正并行发生。此外,即使您有足够的核心让工作人员并行运行,其中 N-1 也不会在完成测试之前得知“获胜者”已接手工作,因此所有核心的总工作量更高.

【讨论】:

  • 但是,当其中一个忙于处理前一行时,不是所有线程都必须等待吗?我不希望主线程弄清楚要唤醒哪个工作人员-那是因为它必须按顺序进行比较(在循环中)。我的方式——如果可能的话——每个工作人员都会与其他人同时进行一次比较。
  • 是的,我确实有 8 个内核。线程数目前为 5,可能有一天会增加到 12 或 15。我很困惑,为什么 pthread API 没有解决这么简单的需求。只需将 pthread_cond_wait() 的互斥参数设为可选就可以解决问题...
  • 抱歉,就我而言,没有“两位作家”——只有一位,主线程。想象一个军官把邮件送到军营:他宣布(广播)每封信上的名字,而一名士兵声称它(在将宣布的名字与他自己的名字比较之后)。这比官员将每封信带给每个人要快得多。在我的情况下,行是“字母”,线程是“士兵”......
  • 我为您提供了几个满足您要求的架构示例。在 rwlock 示例中,“接收者线程”将从工作队列中删除“字母”(这将是第二个写入者)。可能还有其他设计,但无论如何您都需要从接收线程通知主线程。想清楚。
【解决方案2】:

我认为你需要更多地描述这个问题,以及你为什么(试图)一开始就这样做。如果最好的方法是做一些完全不同的事情,不涉及在没有互斥锁的情况下一次唤醒所有线程,我不会感到惊讶。

对我来说,你的描述听起来像:

  • 主线程产生多个线程(产生一个线程相对昂贵)
  • 主线程执行查询,而衍生线程启动时执行的很少,然后阻塞(其中启动/重新启动和阻塞相对昂贵)
  • 对于每一行,主线程唤醒每个线程(相对昂贵的重新启动和阻塞),并且除其中一个之外的每个线程都返回等待(非常浪费)

在不知道为什么要这样做的情况下,我假设根本不使用任何线程会更快(例如,主线程能够比主线程检查行更快地处理一行并告诉一个衍生线程处理它并无缘无故地打扰其他线程)。

如果处理一行需要很长时间,那么我会考虑让工作线程在 FIFO 队列上等待,这样主线程就会将“处理这一行”命令推送到队列中,然后第一个线程从中获取它队列处理该行。

当然我不知道你为什么要做你想做的事,因此任何建议都只是猜测。

TL;DR:我认为您的问题有点像想减肥的人问“砍掉自己腿的最佳方法是什么”(最实际的答案与实际提出的问题无关)。

【讨论】:

  • 在启动时,每个线程必须读取一个(可能非常大的)文件,从它的内容构建一个哈希表——n 个文件。必须检查每一行以确定是否需要更新相应的文件——这是一件相当昂贵的事情。最后,必须更新的文件需要重写。在我看来,最初的开放、检查和最终的重写——以及不同工作人员的完全独立——证明了并行化是合理的。也许,为每一行唤醒他们太贵了——我不知道。我在主线程中实现了查找...
  • 在启动时,每个线程必须从其内容中读取一个构建哈希表的文件(出于未说明的原因) - n 个文件(以及 n 个线程,每个文件一个线程,或 n/m 个文件每个线程?)。必须(以某种方式)检查每一行(具有未指定的内容)以确定是否需要更新相应的文件(可以先这样做,这样不需要更新的文件就不会被读取/processed at all?如果没有,为什么不呢?)最后,需要更新的文件(但还没有被修改过?)需要重写。
  • 某处的某事听起来确实应该并行完成。问题是确定什么,然后如何。对我来说,这听起来像是主线程应该执行查询,然后为查询的每一行启动一个工作线程;每个工作线程都会做任何事情来确定是否需要更新文件并更新文件,而线程之间根本没有任何同步(除了等待所有工作人员完成的主线程)。
  • 是的,Brendan,这正是我构建程序的方式。主线程启动工作程序,并在它们初始化(包括将现有本地表加载到内存中)时启动查询。当记录开始从数据库到达时,主线程将每个记录分派给适当的工作人员。我最初的计划是让主线程简单地更新一个全局变量(使用新的 DB 行)并通知所有线程。然后,该行所属的一个线程将处理它。我必须以不同的方式做(使用每个线程互斥锁和 cond-var)...
猜你喜欢
  • 2011-10-11
  • 1970-01-01
  • 2017-04-21
  • 1970-01-01
  • 2012-02-19
  • 1970-01-01
  • 2011-09-25
  • 2013-01-31
  • 1970-01-01
相关资源
最近更新 更多