OP 的问题引起了我的注意,因为通过多线程获得加速的数字运算是我个人列表中最重要的任务之一。
我必须承认,我对 Eigen 库的体验非常有限。 (我曾经用 3×3 旋转矩阵分解为欧拉角,这在 Eigen 库中以通用的方式非常巧妙地解决了。)
因此,我定义了另一个示例任务,其中包括对示例数据集中的值进行愚蠢的计数。
这是多次完成(使用相同的评估函数):
- 单线程(获取值进行比较)
- 在额外的线程中启动每个子任务(以一种公认的相当愚蠢的方法)
- 以交叉访问示例数据的方式启动线程
- 启动对样本数据进行分区访问的线程。
test-multi-threading.cc:
#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>
// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
size_t n, const T data[], const T &begin, const T &end)
{
size_t result = 0;
for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
return result;
}
typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;
Time duration(const Clock::time_point &t0)
{
return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}
std::vector<Time> makeTest()
{
const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
const size_t NThreads = std::thread::hardware_concurrency();
// make a test sample
std::vector<Value> sample(N);
for (Value &value : sample) value = (Value)rand();
// prepare result vectors
std::vector<size_t> results4[4] = {
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0),
std::vector<size_t>(NGroups, 0)
};
// make test
std::vector<Time> times{
[&]() { // single threading
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[0];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment single-threaded
for (size_t i = 0; i < NGroups; ++i) {
results[i] = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
}
// done
return duration(t0);
}(),
[&]() { // multi-threading - stupid aproach
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[1];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value i = 0; i < NGroups;) {
size_t nT = 0;
for (; nT < NThreads && i < NGroups; ++nT, ++i) {
threads[nT] = std::move(std::thread(
[i, &results, &data, SizeGroup]() {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}));
}
for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
}
// done
return duration(t0);
}(),
[&]() { // multi-threading - interleaved
// make a copy of test sample
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[2];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT; i < NGroups; i += NThreads) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}
}));
}
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
}(),
[&]() { // multi-threading - grouped
std::vector<Value> data(sample);
std::vector<size_t> &results = results4[3];
// remember start time
const Clock::time_point t0 = Clock::now();
// do experiment multi-threaded
std::vector<std::thread> threads(NThreads);
for (Value iT = 0; iT < NThreads; ++iT) {
threads[iT] = std::move(std::thread(
[iT, &results, &data, NGroups, SizeGroup, NThreads]() {
for (Value i = iT * NGroups / NThreads,
iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
size_t result = countFrequency(data.size(), data.data(),
(Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
results[i] = result;
}
}));
}
for (std::thread &threadI : threads) threadI.join();
// done
return duration(t0);
}()
};
// check results (must be equal for any kind of computation)
const unsigned nResults = sizeof results4 / sizeof *results4;
for (unsigned i = 1; i < nResults; ++i) {
size_t nErrors = 0;
for (Value j = 0; j < NGroups; ++j) {
if (results4[0][j] != results4[i][j]) {
++nErrors;
#ifdef _DEBUG
std::cerr
<< "results4[0][" << j << "]: " << results4[0][j]
<< " != results4[" << i << "][" << j << "]: " << results4[i][j]
<< "!\n";
#endif // _DEBUG
}
}
if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!\n";
}
// done
return times;
}
int main()
{
std::cout << "std::thread::hardware_concurrency(): "
<< std::thread::hardware_concurrency() << '\n';
// heat up
std::cout << "Heat up...\n";
for (unsigned i = 0; i < 3; ++i) makeTest();
// repeat NTrials times
const unsigned NTrials = 10;
std::cout << "Measuring " << NTrials << " runs...\n"
<< " "
<< " | " << std::setw(10) << "Single"
<< " | " << std::setw(10) << "Multi 1"
<< " | " << std::setw(10) << "Multi 2"
<< " | " << std::setw(10) << "Multi 3"
<< '\n';
std::vector<double> sumTimes;
for (unsigned i = 0; i < NTrials; ++i) {
std::vector<Time> times = makeTest();
std::cout << std::setw(2) << (i + 1) << ".";
for (const Time &time : times) {
std::cout << " | " << std::setw(10) << time.count();
}
std::cout << '\n';
sumTimes.resize(times.size(), 0.0);
for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
}
std::cout << "Average Values:\n ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(1)
<< sumTime / NTrials;
}
std::cout << '\n';
std::cout << "Ratio:\n ";
for (const double &sumTime : sumTimes) {
std::cout << " | "
<< std::setw(10) << std::fixed << std::setprecision(3)
<< sumTime / sumTimes.front();
}
std::cout << '\n';
// done
return 0;
}
在 Windows 10 上的 cygwin64 上编译和测试:
$ g++ --version
g++ (GCC) 7.3.0
$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc
$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
| Single | Multi 1 | Multi 2 | Multi 3
1. | 384008 | 1052937 | 130662 | 138411
2. | 386500 | 1103281 | 133030 | 132576
3. | 382968 | 1078988 | 137123 | 137780
4. | 395158 | 1120752 | 138731 | 138650
5. | 385870 | 1105885 | 144825 | 129405
6. | 366724 | 1071788 | 137684 | 130289
7. | 352204 | 1104191 | 133675 | 130505
8. | 331679 | 1072299 | 135476 | 138257
9. | 373416 | 1053881 | 138467 | 137613
10. | 370872 | 1096424 | 136810 | 147960
Average Values:
| 372939.9 | 1086042.6 | 136648.3 | 136144.6
Ratio:
| 1.000 | 2.912 | 0.366 | 0.365
我在 coliru.com 上做了同样的事情。 (我不得不减少加热周期和样本量,因为我超过了原始值的时间限制。):
g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
| Single | Multi 1 | Multi 2 | Multi 3
1. | 224684 | 297729 | 48334 | 39016
2. | 146232 | 337222 | 66308 | 59994
3. | 195750 | 344056 | 61383 | 63172
4. | 198629 | 317719 | 62695 | 50413
5. | 149125 | 356471 | 61447 | 57487
6. | 155355 | 322185 | 50254 | 35214
7. | 140269 | 316224 | 61482 | 53889
8. | 154454 | 334814 | 58382 | 53796
9. | 177426 | 340723 | 62195 | 54352
10. | 151951 | 331772 | 61802 | 46727
Average Values:
| 169387.5 | 329891.5 | 59428.2 | 51406.0
Ratio:
| 1.000 | 1.948 | 0.351 | 0.303
Live Demo on coliru
我有点想知道 coliru(只有 4 个线程)上的比率甚至比我的 PC(有 8 个线程)上的要好。实际上,我不知道如何解释这一点。
但是,这两种设置中存在许多其他差异,这些差异可能负责也可能不负责。至少,这两个测量结果都显示了 3rd 和 4th 方法的粗略加速,其中 2nd 唯一地消耗每个潜在的速度-up(可能通过启动和加入所有这些线程)。
查看示例代码,您会发现没有互斥锁或任何其他显式锁定。这是故意的。正如我所了解到的(很多很多年前),每次尝试并行化都可能导致一定的额外通信开销(对于必须交换数据的并发任务)。如果通信开销变得很大,它只会消耗并发的速度优势。因此,可以通过以下方式实现最佳加速:
- 最少的通信开销,即并发任务对独立数据进行操作
- 后合并并发计算结果的工作量最少。
在我的示例代码中,我
- 在启动线程之前准备好所有数据和存储,
- 线程运行时读取的共享数据永远不会改变,
- 以线程本地方式写入的数据(没有两个线程写入相同的数据地址)
- 在所有线程加入后评估计算结果。
关于 3. 我有点挣扎这是否合法,即是否允许写入线程的数据在加入后正确出现在主线程中。 (似乎工作正常的事实通常是虚幻的,但在多线程方面尤其虚幻。)
cppreference.com 提供以下解释
在 Stack Overflow 中,我找到了以下相关的 Q/A:
这说服了我,没关系。
但缺点是
另一种方法是使用线程池来克服这个问题。我用谷歌搜索了一下,发现例如Jakob Progsch's ThreadPool on github。然而,我猜想,有了线程池,锁定问题又回到了“游戏中”。
这是否也适用于 Eigen 函数,取决于 resp.实现了特征函数。如果其中有对全局变量的访问(当同时调用同一个函数时,这些全局变量变为共享),这将导致数据竞争。
谷歌了一下,我找到了以下文档。
Eigen and multi-threading – Using Eigen in a multi-threaded application:
如果您自己的应用程序是多线程的,并且多个线程调用Eigen,那么您必须在创建线程之前通过调用以下例程来初始化Eigen:
#include <Eigen/Core>
int main(int argc, char** argv)
{
Eigen::initParallel();
...
}
注意
使用Eigen 3.3 和完全符合C++11 的编译器(即thread-safe static local variable initialization),则调用initParallel() 是可选的。
警告
请注意,所有生成随机矩阵的函数都不是可重入的,也不是线程安全的。这些包括DenseBase::Random() 和DenseBase::setRandom(),尽管调用了 Eigen::initParallel()。这是因为这些函数基于不可重入的 std::rand。对于线程安全的随机生成器,我们推荐使用 boost::random 或 c++11 随机特性。