【问题标题】:Implementation of a work stealing queue in C/C++? [closed]在 C/C++ 中实现工作窃取队列? [关闭]
【发布时间】:2011-01-07 06:58:52
【问题描述】:

我正在寻找在 C/CPP 中正确实现工作窃取队列的方法。我环顾了谷歌,但没有发现任何有用的东西。

也许有人熟悉一个好的开源实现? (我不喜欢实现取自原始学术论文的伪代码)。

【问题讨论】:

    标签: c++ multithreading algorithm queue work-stealing


    【解决方案1】:

    这个开源库https://github.com/cpp-taskflow/cpp-taskflow 自 2018 年 12 月起支持工作窃取线程池。

    查看WorkStealingQueue 类,该类实现了论文“Dynamic Circular Work-stealing Deque”中所述的工作窃取队列,SPAA,2015。

    【讨论】:

      【解决方案2】:

      OpenMP 可能很好地支持工作窃取,尽管它被称为递归并行

      OpenMP forum post

      OpenMP 规范定义了任务构造(可以嵌套,因此非常适合递归并行),但没有详细说明它们是如何实现的。 OpenMP 实现,包括 gcc,通常使用某种形式的工作窃取任务,尽管确切的算法(以及由此产生的性能)可能会有所不同!

      #pragma omp task#pragma omp taskwait

      更新

      C++ Concurrency in Action 一书的第 9 章描述了如何实现“池线程的工作窃取”。我自己没有阅读/实现它,但看起来并不难。

      【讨论】:

        【解决方案3】:

        我已将 this C project 移植到 C++。

        原来的Steal在数组展开时可能会遇到脏读。我试图修复这个错误,但最终放弃了,因为我实际上并不需要动态增长的堆栈。 Push 方法不尝试分配空间,而是简单地返回 false。然后调用者可以执行旋转等待,即while(!stack->Push(value)){}

        #pragma once
        #include <atomic>
        
          // A lock-free stack.
          // Push = single producer
          // Pop = single consumer (same thread as push)
          // Steal = multiple consumer
        
          // All methods, including Push, may fail. Re-issue the request
          // if that occurs (spinwait).
        
          template<class T, size_t capacity = 131072>
          class WorkStealingStack {
        
          public:
            inline WorkStealingStack() {
              _top = 1;
              _bottom = 1;
            }
        
            WorkStealingStack(const WorkStealingStack&) = delete;
        
            inline ~WorkStealingStack()
            {
        
            }
        
            // Single producer
            inline bool Push(const T& item) {
              auto oldtop = _top.load(std::memory_order_relaxed);
              auto oldbottom = _bottom.load(std::memory_order_relaxed);
              auto numtasks = oldbottom - oldtop;
        
              if (
                oldbottom > oldtop && // size_t is unsigned, validate the result is positive
                numtasks >= capacity - 1) {
                // The caller can decide what to do, they will probably spinwait.
                return false;
              }
        
              _values[oldbottom % capacity].store(item, std::memory_order_relaxed);
              _bottom.fetch_add(1, std::memory_order_release);
              return true;
            }
        
            // Single consumer
            inline bool Pop(T& result) {
        
              size_t oldtop, oldbottom, newtop, newbottom, ot;
        
              oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
              ot = oldtop = _top.load(std::memory_order_acquire);
              newtop = oldtop + 1;
              newbottom = oldbottom - 1;
        
              // Bottom has wrapped around.
              if (oldbottom < oldtop) {
                _bottom.store(oldtop, std::memory_order_relaxed);
                return false;
              }
        
              // The queue is empty.
              if (oldbottom == oldtop) {
                _bottom.fetch_add(1, std::memory_order_release);
                return false;
              }
        
              // Make sure that we are not contending for the item.
              if (newbottom == oldtop) {
                auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
                if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
                  _bottom.fetch_add(1, std::memory_order_release);
                  return false;
                }
                else {
                  result = ret;
                  _bottom.store(newtop, std::memory_order_release);
                  return true;
                }
              }
        
              // It's uncontended.
              result = _values[newbottom % capacity].load(std::memory_order_acquire);
              return true;
            }
        
            // Multiple consumer.
            inline bool Steal(T& result) {
              size_t oldtop, newtop, oldbottom;
        
              oldtop = _top.load(std::memory_order_acquire);
              oldbottom = _bottom.load(std::memory_order_relaxed);
              newtop = oldtop + 1;
        
              if (oldbottom <= oldtop)
                return false;
        
              // Make sure that we are not contending for the item.
              if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
                return false;
              }
        
              result = _values[oldtop % capacity].load(std::memory_order_relaxed);
              return true;
            }
        
          private:
        
            // Circular array
            std::atomic<T> _values[capacity];
            std::atomic<size_t> _top; // queue
            std::atomic<size_t> _bottom; // stack
          };
        

        Full Gist (including unit tests). 我只在强架构 (x86/64) 上运行测试,所以就弱架构而言,如果您尝试在例如霓虹灯/PPC。

        【讨论】:

          【解决方案4】:

          我发现这个工作窃取算法的最接近的实现是 Karl-Filip Faxén 称为 Wool 的东西。 src / report / comparison

          【讨论】:

            【解决方案5】:

            如果您正在寻找基于 pthread 或 boost::thread 的 C++ 中独立的工作窃取队列类实现,那么祝您好运,据我所知没有。

            但是,正如其他人所说,Cilk、TBB 和 Microsoft 的 PPL 都在幕后实现了工作窃取。

            问题是您要使用工作窃取队列还是实施一个?如果您只想使用一个,那么上面的选择是很好的起点,只需在其中任何一个中安排一个“任务”就足够了。

            正如 BlueRaja 所说,PPL 中的 task_group 和structured_task_group 将执行此操作,还请注意,这些类也可用于最新版本的英特尔 TBB。并行循环(parallel_for、parallel_for_each)也通过工作窃取来实现。

            如果您必须查看源代码而不是使用实现,TBB 是开源的,Microsoft 为其 CRT 提供源代码,因此您可以继续探索。

            您也可以在 Joe Duffy 的博客上查看 C# 实现(但它是 C# 并且内存模型不同)。

            -瑞克

            【讨论】:

              【解决方案6】:

              实现“偷工减料”在理论上并不难。您需要一组包含任务的队列,这些任务通过结合计算和生成其他任务来完成更多工作。您需要对队列进行原子访问,以将新生成的任务放入这些队列中。最后,您需要每个任务最后调用的过程,以便为执行任务的线程找到更多工作;该过程需要在工作队列中查找工作。

              大多数此类工作窃取系统都假设存在少量线程(通常由真实处理器内核支持),并且每个线程只有一个工作队列。然后你首先尝试从自己的队列中窃取工作,如果它是空的,则尝试从其他人那里窃取。棘手的是知道要查看哪些队列;连续扫描它们以寻找工作非常昂贵,并且可能会在寻找工作的线程之间产生大量争用。

              到目前为止,这都是非常通用的东西,只有两个主要例外:1) 不能在纯 C 或 C++ 中说明切换上下文(例如,设置处理器上下文寄存器,例如“堆栈”)。您可以通过同意在目标平台特定的机器代码中编写包的一部分来解决此问题。 2) 对多处理器队列的原子访问不能仅在 C 或 C++ 中完成(忽略 Dekker 算法),因此您需要使用汇编语言同步原语(如 X86 LOCK XCH 或比较和交换)对这些进行编码。现在,一旦您获得安全访问权限,更新队列所涉及的代码并不是很复杂,您可以轻松地用几行 C 语言编写。

              但是,我想您会发现,尝试使用混合汇编程序在 C 和 C++ 中编写这样的包仍然是相当低效的,而且您最终还是会在汇编程序中编写整个代码。剩下的都是 C/C++ 兼容的入口点:-}

              我为我们的PARLANSE 并行编程语言做了这个,它提供了任意大量并行计算在任何时刻实时和交互(同步)的想法。它是在 X86 上的幕后实现的,每个 CPU 一个线程,并且完全在汇编程序中实现。窃取工作的代码总共可能有 1000 行,而且它的代码很棘手,因为您希望它在非争用情况下非常快。

              C 和 C++ 真正的美中不足之处在于,当您创建一个代表工作的任务时,您分配了多少堆栈空间?串行 C/C++ 程序通过简单地过度分配大量(例如,10Mb)一个线性堆栈来避免这个问题,并且没有人关心浪费了多少堆栈空间。但是,如果您可以创建数千个任务并让它们都在特定时刻运行,那么您就无法合理地为每个任务分配 10Mb。所以现在你要么需要静态确定一个任务需要多少堆栈空间(图灵硬),要么你需要分配堆栈块(例如,每个函数调用),这是广泛可用的 C/C++ 编译器不做的(例如,您可能使用的那个)。最后的出路是限制任务创建,使其在任何时刻限制在几百个,并在实时任务中复用几百个非常巨大的堆栈。如果任务可以互锁/暂停状态,则不能执行最后一个,因为您会遇到阈值。因此,只有当任务only 进行计算时,您才能这样做。这似乎是一个非常严格的约束。

              对于 PARLANSE,我们构建了一个编译器,为每个函数调用在堆上分配激活记录。

              【讨论】:

              • 或者你做的是理智的事情,在任务实际运行之前不要为任务分配空间,不要将任务视为暂停和恢复的事情,而是从执行中运行完成。
              • 您的解决方案不健全。如果您构建复杂的系统,当一项工作可以调用任意其他工作时,您不能保证您的任务不需要暂停。您当然可以强制此属性为真;然后,您将很难构建复杂的系统。我们在 PARLANSE 中构建了百万行并行程序。
              • Linux 在处理 10,000 个线程的进程时表现如何? Windows 以每个进程约 15,000 个线程的速度爆炸。 blogs.technet.com/b/markrussinovich/archive/2009/07/08/…。我想拥有字面上数百万个单独需要等待事件的“线程”。 PARLANSE 可以做到这一点。我不认为 Linux 或 Windows 操作系统被配置为很好地处理一百万个线程。我预计会出现各种资源问题,包括仅管理线程句柄。
              • @PSkocik:你的意思是“如果我是 CPU k,我应该寻找其他队列 1..N 来窃取工作吗?”如果 k 为空,则 k 简单地扫描所有其他队列,这是一种糟糕的方式。对于 4 个队列,这可能没问题,但对 32-64 个队列没有吸引力。增加一些开销的更好方法是在一个单词中保留一个位向量,以跟踪哪些队列有工作;它可以用 OR 和 AND 廉价地更新。 ...
              • ... 如果您锁定操作,您可以使该位向量准确,但这会使更新破坏其目的变得昂贵。所以我这样做是不同步的,这意味着它只是建议性的。不过,这是一个很好的提示,首先要看哪里。
              【解决方案7】:

              有一种工具可以以一种非常优雅的方式简单地完成它。这是在很短的时间内并行化您的程序的一种非常有效的方法。

              Cilk project

              HPC 挑战奖

              我们参加 HPC 挑战赛的 Cilk 参赛作品 2006年荣获二等奖 ``优雅与优雅的最佳结合 表现''。该奖项于 SC'06 于 2006 年 11 月 14 日在坦帕举行。

              【讨论】:

                【解决方案8】:

                没有免费的午餐。

                请看the original work stealing paper。这篇论文很难理解。我知道那篇论文包含理论证明而不是伪代码。但是,根本没有比 TBB 更简单的版本更多。如果有的话,它不会提供最佳性能。工作窃取本身会产生一些开销,因此优化和技巧非常重要。特别是,出队必须是线程安全的。实现高度可扩展和低开销的同步具有挑战性。

                我真的很想知道你为什么需要它。我认为正确的实现意味着类似于 TBB 和 Cilk。同样,工作窃取很难实施。

                【讨论】:

                【解决方案9】:

                PPLstructured_task_group 类在其实现中使用了工作窃取队列。如果您需要 WSQ 进行线程处理,我建议您这样做。
                如果你真的在找源码,我不知道ppl.h中是否给出了代码,或者是否有预编译的对象;今晚回家我得看看。

                【讨论】:

                  【解决方案10】:

                  我不认为JobSwarm 使用工作窃取,但这是第一步。我不知道其他用于此目的的开源库。

                  【讨论】:

                    猜你喜欢
                    • 2013-03-27
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 2018-01-22
                    • 1970-01-01
                    • 1970-01-01
                    • 2011-09-07
                    相关资源
                    最近更新 更多