没有一种神奇的技术可以对共享数据进行高性能的并行访问,但有一些您会经常看到的通用技术。
我将使用并行求和数组的示例来回答我的问题,但这些技术非常普遍地适用于许多并行算法。
1) 首先避免共享数据
这可能是最安全、最快的方法。与其让您的工作线程直接更新共享状态,不如让它们每个都使用自己的本地状态,然后让您的主线程组合结果。对于数组求和示例,这可能如下所示:
int main() {
std::vector<int> toSum = getSomeVector();
std::vector<int> sums(NUM_THREADS);
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd](int& result) mutable {
for (; chunkBegin != chunkEnd; ++chunkBegin) {
result += *chunkBegin;
}
}, std::ref(sums[i]));
}
for (std::thread& thd : threads) {
thd.join();
}
int finalSum = 0;
for (int partialSum : sums) {
finalSum += partialSum;
}
std::cout << finalSum << '\n';
}
由于每个线程只对自己的部分和进行操作,因此它们不会相互干扰,也不需要额外的同步。您必须在最后做一些额外的工作才能将所有部分总和相加,但部分结果的数量很少,所以这个开销应该很小。
2) 互斥
您可以使用锁定机制来保护共享状态,而不是让每个线程在自己的状态上运行。通常,这是一个互斥锁,但有许多不同的锁定原语,它们的作用略有不同。这里的重点是确保一次只有一个线程使用共享状态。使用此技术时要非常小心,以避免在紧密循环中访问共享状态。由于一次只有一个线程可以持有锁,因此很容易意外地将您喜欢的并行代码转换回单线程代码,方法是使一次只能有一个线程工作。
例如,考虑以下情况:
int main() {
std::vector<int> toSum = getSomeVector();
int sum = 0;
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
std::mutex mtx;
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd, &mtx, &sum]() mutable {
for (; chunkBegin != chunkEnd; ++chunkBegin) {
std::lock_guard guard(mtx);
sum += *chunkBegin;
}
});
}
for (std::thread& thd : threads) {
thd.join();
}
std::cout << sum << '\n';
}
由于每个线程在其循环中锁定mtx,因此一次只能有一个线程在做任何工作。这里没有并行化,由于分配线程以及锁定和解锁互斥体的额外开销,此代码可能慢 比等效的单线程代码。
而是尽量独立地做,并尽可能少地访问你的共享状态。对于此示例,您可以执行与 (1) 中的示例类似的操作,并在每个线程中建立部分总和,最后只将它们添加到共享总和中一次:
int main() {
std::vector<int> toSum = getSomeVector();
int sum = 0;
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
std::mutex mtx;
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd, &mtx, &sum]() mutable {
int partialSum = 0;
for (; chunkBegin != chunkEnd; ++chunkBegin) {
partialSum += *chunkBegin;
}
{
std::lock_guard guard(mtx);
sum += partialSum;
}
});
}
for (std::thread& thd : threads) {
thd.join();
}
std::cout << sum << '\n';
}
3) 原子变量
原子变量是可以在线程之间“安全”共享的变量。它们非常强大,但也很容易出错。您必须担心诸如内存排序约束之类的事情,当您弄错它们时,可能很难调试并找出您做错了什么。
在其核心,原子变量可以实现为一个简单的变量,其操作由互斥锁或类似物保护。神奇之处在于实现,它经常使用特殊的 CPU 指令来协调对 CPU 级别的变量的访问,以避免大量的锁定和解锁开销。
虽然原子不是灵丹妙药。仍然涉及开销,并且您仍然可以通过过于频繁地访问您的原子来击中自己的脚。您的 CPU 进行了大量缓存,并且让多个线程写入原子变量可能意味着将内容溢出回内存,或者至少溢出到更高级别的缓存。再一次,如果您可以避免在线程中使用紧密循环访问共享状态,您应该这样做:
int main() {
std::vector<int> toSum = getSomeVector();
std::atomic<int> sum(0);
std::vector<std::thread> threads;
int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
for (int i = 0; i < NUM_THREADS; ++i) {
auto chunkBegin = toSum.begin() + (i * chunkSize);
auto chunkEnd = chunkBegin + chunkSize;
threads.emplace_back([chunkBegin, chunkEnd, &sum]() mutable {
int partialSum = 0;
for (; chunkBegin != chunkEnd; ++chunkBegin) {
partialSum += *chunkBegin;
}
// Since we don't care about the order that the threads update the sum,
// we can use memory_order_relaxed. This is a rabbit-hole I won't get
// too deep into here though.
sum.fetch_add(partialSum, std::memory_order_relaxed);
});
}
for (std::thread& thd : threads) {
thd.join();
}
std::cout << sum << '\n';
}