【发布时间】:2011-01-07 06:58:52
【问题描述】:
我正在寻找在 C/CPP 中正确实现工作窃取队列的方法。我环顾了谷歌,但没有发现任何有用的东西。
也许有人熟悉一个好的开源实现? (我不喜欢实现取自原始学术论文的伪代码)。
【问题讨论】:
标签: c++ multithreading algorithm queue work-stealing
我正在寻找在 C/CPP 中正确实现工作窃取队列的方法。我环顾了谷歌,但没有发现任何有用的东西。
也许有人熟悉一个好的开源实现? (我不喜欢实现取自原始学术论文的伪代码)。
【问题讨论】:
标签: c++ multithreading algorithm queue work-stealing
这个开源库https://github.com/cpp-taskflow/cpp-taskflow 自 2018 年 12 月起支持工作窃取线程池。
查看WorkStealingQueue 类,该类实现了论文“Dynamic Circular Work-stealing Deque”中所述的工作窃取队列,SPAA,2015。
【讨论】:
OpenMP 可能很好地支持工作窃取,尽管它被称为递归并行
OpenMP 规范定义了任务构造(可以嵌套,因此非常适合递归并行),但没有详细说明它们是如何实现的。 OpenMP 实现,包括 gcc,通常使用某种形式的工作窃取任务,尽管确切的算法(以及由此产生的性能)可能会有所不同!
见#pragma omp task和#pragma omp taskwait
更新
C++ Concurrency in Action 一书的第 9 章描述了如何实现“池线程的工作窃取”。我自己没有阅读/实现它,但看起来并不难。
【讨论】:
我已将 this C project 移植到 C++。
原来的Steal在数组展开时可能会遇到脏读。我试图修复这个错误,但最终放弃了,因为我实际上并不需要动态增长的堆栈。 Push 方法不尝试分配空间,而是简单地返回 false。然后调用者可以执行旋转等待,即while(!stack->Push(value)){}。
#pragma once
#include <atomic>
// A lock-free stack.
// Push = single producer
// Pop = single consumer (same thread as push)
// Steal = multiple consumer
// All methods, including Push, may fail. Re-issue the request
// if that occurs (spinwait).
template<class T, size_t capacity = 131072>
class WorkStealingStack {
public:
inline WorkStealingStack() {
_top = 1;
_bottom = 1;
}
WorkStealingStack(const WorkStealingStack&) = delete;
inline ~WorkStealingStack()
{
}
// Single producer
inline bool Push(const T& item) {
auto oldtop = _top.load(std::memory_order_relaxed);
auto oldbottom = _bottom.load(std::memory_order_relaxed);
auto numtasks = oldbottom - oldtop;
if (
oldbottom > oldtop && // size_t is unsigned, validate the result is positive
numtasks >= capacity - 1) {
// The caller can decide what to do, they will probably spinwait.
return false;
}
_values[oldbottom % capacity].store(item, std::memory_order_relaxed);
_bottom.fetch_add(1, std::memory_order_release);
return true;
}
// Single consumer
inline bool Pop(T& result) {
size_t oldtop, oldbottom, newtop, newbottom, ot;
oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
ot = oldtop = _top.load(std::memory_order_acquire);
newtop = oldtop + 1;
newbottom = oldbottom - 1;
// Bottom has wrapped around.
if (oldbottom < oldtop) {
_bottom.store(oldtop, std::memory_order_relaxed);
return false;
}
// The queue is empty.
if (oldbottom == oldtop) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
// Make sure that we are not contending for the item.
if (newbottom == oldtop) {
auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
else {
result = ret;
_bottom.store(newtop, std::memory_order_release);
return true;
}
}
// It's uncontended.
result = _values[newbottom % capacity].load(std::memory_order_acquire);
return true;
}
// Multiple consumer.
inline bool Steal(T& result) {
size_t oldtop, newtop, oldbottom;
oldtop = _top.load(std::memory_order_acquire);
oldbottom = _bottom.load(std::memory_order_relaxed);
newtop = oldtop + 1;
if (oldbottom <= oldtop)
return false;
// Make sure that we are not contending for the item.
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
return false;
}
result = _values[oldtop % capacity].load(std::memory_order_relaxed);
return true;
}
private:
// Circular array
std::atomic<T> _values[capacity];
std::atomic<size_t> _top; // queue
std::atomic<size_t> _bottom; // stack
};
Full Gist (including unit tests). 我只在强架构 (x86/64) 上运行测试,所以就弱架构而言,如果您尝试在例如霓虹灯/PPC。
【讨论】:
我发现这个工作窃取算法的最接近的实现是 Karl-Filip Faxén 称为 Wool 的东西。 src / report / comparison
【讨论】:
如果您正在寻找基于 pthread 或 boost::thread 的 C++ 中独立的工作窃取队列类实现,那么祝您好运,据我所知没有。
但是,正如其他人所说,Cilk、TBB 和 Microsoft 的 PPL 都在幕后实现了工作窃取。
问题是您要使用工作窃取队列还是实施一个?如果您只想使用一个,那么上面的选择是很好的起点,只需在其中任何一个中安排一个“任务”就足够了。
正如 BlueRaja 所说,PPL 中的 task_group 和structured_task_group 将执行此操作,还请注意,这些类也可用于最新版本的英特尔 TBB。并行循环(parallel_for、parallel_for_each)也通过工作窃取来实现。
如果您必须查看源代码而不是使用实现,TBB 是开源的,Microsoft 为其 CRT 提供源代码,因此您可以继续探索。
您也可以在 Joe Duffy 的博客上查看 C# 实现(但它是 C# 并且内存模型不同)。
-瑞克
【讨论】:
实现“偷工减料”在理论上并不难。您需要一组包含任务的队列,这些任务通过结合计算和生成其他任务来完成更多工作。您需要对队列进行原子访问,以将新生成的任务放入这些队列中。最后,您需要每个任务最后调用的过程,以便为执行任务的线程找到更多工作;该过程需要在工作队列中查找工作。
大多数此类工作窃取系统都假设存在少量线程(通常由真实处理器内核支持),并且每个线程只有一个工作队列。然后你首先尝试从自己的队列中窃取工作,如果它是空的,则尝试从其他人那里窃取。棘手的是知道要查看哪些队列;连续扫描它们以寻找工作非常昂贵,并且可能会在寻找工作的线程之间产生大量争用。
到目前为止,这都是非常通用的东西,只有两个主要例外:1) 不能在纯 C 或 C++ 中说明切换上下文(例如,设置处理器上下文寄存器,例如“堆栈”)。您可以通过同意在目标平台特定的机器代码中编写包的一部分来解决此问题。 2) 对多处理器队列的原子访问不能仅在 C 或 C++ 中完成(忽略 Dekker 算法),因此您需要使用汇编语言同步原语(如 X86 LOCK XCH 或比较和交换)对这些进行编码。现在,一旦您获得安全访问权限,更新队列所涉及的代码并不是很复杂,您可以轻松地用几行 C 语言编写。
但是,我想您会发现,尝试使用混合汇编程序在 C 和 C++ 中编写这样的包仍然是相当低效的,而且您最终还是会在汇编程序中编写整个代码。剩下的都是 C/C++ 兼容的入口点:-}
我为我们的PARLANSE 并行编程语言做了这个,它提供了任意大量并行计算在任何时刻实时和交互(同步)的想法。它是在 X86 上的幕后实现的,每个 CPU 一个线程,并且完全在汇编程序中实现。窃取工作的代码总共可能有 1000 行,而且它的代码很棘手,因为您希望它在非争用情况下非常快。
C 和 C++ 真正的美中不足之处在于,当您创建一个代表工作的任务时,您分配了多少堆栈空间?串行 C/C++ 程序通过简单地过度分配大量(例如,10Mb)一个线性堆栈来避免这个问题,并且没有人关心浪费了多少堆栈空间。但是,如果您可以创建数千个任务并让它们都在特定时刻运行,那么您就无法合理地为每个任务分配 10Mb。所以现在你要么需要静态确定一个任务需要多少堆栈空间(图灵硬),要么你需要分配堆栈块(例如,每个函数调用),这是广泛可用的 C/C++ 编译器不做的(例如,您可能使用的那个)。最后的出路是限制任务创建,使其在任何时刻限制在几百个,并在实时任务中复用几百个非常巨大的堆栈。如果任务可以互锁/暂停状态,则不能执行最后一个,因为您会遇到阈值。因此,只有当任务only 进行计算时,您才能这样做。这似乎是一个非常严格的约束。
对于 PARLANSE,我们构建了一个编译器,为每个函数调用在堆上分配激活记录。
【讨论】:
有一种工具可以以一种非常优雅的方式简单地完成它。这是在很短的时间内并行化您的程序的一种非常有效的方法。
HPC 挑战奖
我们参加 HPC 挑战赛的 Cilk 参赛作品 2006年荣获二等奖 ``优雅与优雅的最佳结合 表现''。该奖项于 SC'06 于 2006 年 11 月 14 日在坦帕举行。
【讨论】:
没有免费的午餐。
请看the original work stealing paper。这篇论文很难理解。我知道那篇论文包含理论证明而不是伪代码。但是,根本没有比 TBB 更简单的版本更多。如果有的话,它不会提供最佳性能。工作窃取本身会产生一些开销,因此优化和技巧非常重要。特别是,出队必须是线程安全的。实现高度可扩展和低开销的同步具有挑战性。
我真的很想知道你为什么需要它。我认为正确的实现意味着类似于 TBB 和 Cilk。同样,工作窃取很难实施。
【讨论】:
PPL 的structured_task_group 类在其实现中使用了工作窃取队列。如果您需要 WSQ 进行线程处理,我建议您这样做。
如果你真的在找源码,我不知道ppl.h中是否给出了代码,或者是否有预编译的对象;今晚回家我得看看。
【讨论】:
我不认为JobSwarm 使用工作窃取,但这是第一步。我不知道其他用于此目的的开源库。
【讨论】: