【问题标题】:Eigen library with C++11 multithreading具有 C++11 多线程的特征库
【发布时间】:2020-10-30 20:14:06
【问题描述】:

我有一个代码来计算期望最大化的高斯混合模型,以便从给定的输入数据样本中识别集群。

一段代码正在重复计算这种模型以进行多次试验Ntrials(一个独立的另一个但使用相同的输入数据),以便最终获得最佳解决方案(最大化模型的总可能性)。这个概念可以推广到许多其他聚类算法(例如 k-means)。

我想通过 C++11 的多线程并行化必须重复 Ntrials 次的代码部分,以便每个线程执行一次试验。

一个代码示例,假设 (Ndimensions x Npoints) 的输入 Eigen::ArrayXXd sample 可以是以下类型:

    double bestTotalModelProbability = 0;
    Eigen::ArrayXd clusterIndicesFromSample(Npoints);
    clusterIndicesFromSample.setZero();

    for (int i=0; i < Ntrials; i++)
    {
         totalModelProbability = computeGaussianMixtureModel(sample);


         // Check if this trial is better than the previous one.
         // If so, update the results (cluster index for each point
         // in the sample) and keep them.

         if totalModelProbability > bestTotalModelProbability
         {
             bestTotalModelProbability = totalModelProbability;
             ...
             clusterIndicesFromSample = obtainClusterMembership(sample);
         }
    }

我将样本的参考值 (Eigen::Ref) 传递给函数 computeGaussianMixtureModel()obtainClusterMembership()。。 p>

我的代码主要基于 Eigen 数组,我所处理的 N 维问题可以考虑 10-100 个维度和 500-1000 个不同的样本点。我正在寻找一些示例来使用 Eigen 数组和 C++11 的 std:thread 创建此代码的多线程版本,但找不到任何东西,我正在努力制作一些用于操作 Eigen 数组的简单示例.

我什至不确定 Eigen 是否可以在 C++11 的 std::thread 中使用。 有人可以通过一些简单的例子来帮助我理解合成器吗? 我在具有 6 个内核(12 个线程)的 CPU 上使用 clang++ 作为 Mac OSX 中的编译器。

【问题讨论】:

  • 为什么不能在线程内使用特征值?直接使用 C++11 线程很痛苦,我强烈建议您使用一些语言扩展(openmp、openacc 等)或库(TBB 等)以使其更容易。
  • 看起来试用号 i 取决于前一个,因此并行化函数 computeGaussianMixtureModelobtainClusterMembership 可能更容易,正如 Marc Glisse 所说,我也会建议从更容易使用的 OpenMP 开始。

标签: c++ multithreading c++11 parallel-processing eigen


【解决方案1】:

OP 的问题引起了我的注意,因为通过多线程获得加速的数字运算是我个人列表中最重要的任务之一。

我必须承认,我对 Eigen 库的体验非常有限。 (我曾经用 3×3 旋转矩阵分解为欧拉角,这在 Eigen 库中以通用的方式非常巧妙地解决了。)

因此,我定义了另一个示例任务,其中包括对示例数据集中的值进行愚蠢的计数。

这是多次完成(使用相同的评估函数):

  1. 单线程(获取值进行比较)
  2. 在额外的线程中启动每个子任务(以一种公认的相当愚蠢的方法)
  3. 以交叉访问示例数据的方式启动线程
  4. 启动对样本数据进行分区访问的线程。

test-multi-threading.cc:

#include <cstdint>
#include <cstdlib>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <limits>
#include <thread>
#include <vector>

// a sample function to process a certain amount of data
template <typename T>
size_t countFrequency(
  size_t n, const T data[], const T &begin, const T &end)
{
  size_t result = 0;
  for (size_t i = 0; i < n; ++i) result += data[i] >= begin && data[i] < end;
  return result;
}

typedef std::uint16_t Value;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::microseconds MuSecs;
typedef decltype(std::chrono::duration_cast<MuSecs>(Clock::now() - Clock::now())) Time;

Time duration(const Clock::time_point &t0)
{
  return std::chrono::duration_cast<MuSecs>(Clock::now() - t0);
}

std::vector<Time> makeTest()
{
  const Value SizeGroup = 4, NGroups = 10000, N = SizeGroup * NGroups;
  const size_t NThreads = std::thread::hardware_concurrency();
  // make a test sample
  std::vector<Value> sample(N);
  for (Value &value : sample) value = (Value)rand();
  // prepare result vectors
  std::vector<size_t> results4[4] = {
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0),
    std::vector<size_t>(NGroups, 0)
  };
  // make test
  std::vector<Time> times{
    [&]() { // single threading
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[0];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment single-threaded
      for (size_t i = 0; i < NGroups; ++i) {
        results[i] = countFrequency(data.size(), data.data(),
          (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - stupid aproach
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[1];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value i = 0; i < NGroups;) {
        size_t nT = 0;
        for (; nT < NThreads && i < NGroups; ++nT, ++i) {
          threads[nT] = std::move(std::thread(
            [i, &results, &data, SizeGroup]() {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }));
        }
        for (size_t iT = 0; iT < nT; ++iT) threads[iT].join();
      }
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - interleaved
      // make a copy of test sample
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[2];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT; i < NGroups; i += NThreads) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }(),
    [&]() { // multi-threading - grouped
      std::vector<Value> data(sample);
      std::vector<size_t> &results = results4[3];
      // remember start time
      const Clock::time_point t0 = Clock::now();
      // do experiment multi-threaded
      std::vector<std::thread> threads(NThreads);
      for (Value iT = 0; iT < NThreads; ++iT) {
        threads[iT] = std::move(std::thread(
          [iT, &results, &data, NGroups, SizeGroup, NThreads]() {
            for (Value i = iT * NGroups / NThreads,
              iN = (iT + 1) * NGroups / NThreads; i < iN; ++i) {
              size_t result = countFrequency(data.size(), data.data(),
                (Value)(i * SizeGroup), (Value)((i + 1) * SizeGroup));
              results[i] = result;
            }
          }));
      }
      for (std::thread &threadI : threads) threadI.join();
      // done
      return duration(t0);
    }()
  };
  // check results (must be equal for any kind of computation)
  const unsigned nResults = sizeof results4 / sizeof *results4;
  for (unsigned i = 1; i < nResults; ++i) {
    size_t nErrors = 0;
    for (Value j = 0; j < NGroups; ++j) {
      if (results4[0][j] != results4[i][j]) {
        ++nErrors;
#ifdef _DEBUG
        std::cerr
          << "results4[0][" << j << "]: " << results4[0][j]
          << " != results4[" << i << "][" << j << "]: " << results4[i][j]
          << "!\n";
#endif // _DEBUG
      }
    }
    if (nErrors) std::cerr << nErrors << " errors in results4[" << i << "]!\n";
  }
  // done
  return times;
}

int main()
{
  std::cout << "std::thread::hardware_concurrency(): "
    << std::thread::hardware_concurrency() << '\n';
  // heat up
  std::cout << "Heat up...\n";
  for (unsigned i = 0; i < 3; ++i) makeTest();
  // repeat NTrials times
  const unsigned NTrials = 10;
  std::cout << "Measuring " << NTrials << " runs...\n"
    << "   "
    << " | " << std::setw(10) << "Single"
    << " | " << std::setw(10) << "Multi 1"
    << " | " << std::setw(10) << "Multi 2"
    << " | " << std::setw(10) << "Multi 3"
    << '\n';
  std::vector<double> sumTimes;
  for (unsigned i = 0; i < NTrials; ++i) {
    std::vector<Time> times = makeTest();
    std::cout << std::setw(2) << (i + 1) << ".";
    for (const Time &time : times) {
      std::cout << " | " << std::setw(10) << time.count();
    }
    std::cout << '\n';
    sumTimes.resize(times.size(), 0.0);
    for (size_t j = 0; j < times.size(); ++j) sumTimes[j] += times[j].count();
  }
  std::cout << "Average Values:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(1)
      << sumTime / NTrials;
  }
  std::cout << '\n';
  std::cout << "Ratio:\n   ";
  for (const double &sumTime : sumTimes) {
    std::cout << " | "
      << std::setw(10) << std::fixed << std::setprecision(3)
      << sumTime / sumTimes.front();
  }
  std::cout << '\n';
  // done
  return 0;
}

在 Windows 10 上的 cygwin64 上编译和测试:

$ g++ --version
g++ (GCC) 7.3.0

$ g++ -std=c++11 -O2 -o test-multi-threading test-multi-threading.cc

$ ./test-multi-threading
std::thread::hardware_concurrency(): 8
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     384008 |    1052937 |     130662 |     138411
 2. |     386500 |    1103281 |     133030 |     132576
 3. |     382968 |    1078988 |     137123 |     137780
 4. |     395158 |    1120752 |     138731 |     138650
 5. |     385870 |    1105885 |     144825 |     129405
 6. |     366724 |    1071788 |     137684 |     130289
 7. |     352204 |    1104191 |     133675 |     130505
 8. |     331679 |    1072299 |     135476 |     138257
 9. |     373416 |    1053881 |     138467 |     137613
10. |     370872 |    1096424 |     136810 |     147960
Average Values:
    |   372939.9 |  1086042.6 |   136648.3 |   136144.6
Ratio:
    |      1.000 |      2.912 |      0.366 |      0.365

我在 coliru.com 上做了同样的事情。 (我不得不减少加热周期和样本量,因为我超过了原始值的时间限制。):

g++ (GCC) 8.1.0
Copyright (C) 2018 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

std::thread::hardware_concurrency(): 4
Heat up...
Measuring 10 runs...
    |     Single |    Multi 1 |    Multi 2 |    Multi 3
 1. |     224684 |     297729 |      48334 |      39016
 2. |     146232 |     337222 |      66308 |      59994
 3. |     195750 |     344056 |      61383 |      63172
 4. |     198629 |     317719 |      62695 |      50413
 5. |     149125 |     356471 |      61447 |      57487
 6. |     155355 |     322185 |      50254 |      35214
 7. |     140269 |     316224 |      61482 |      53889
 8. |     154454 |     334814 |      58382 |      53796
 9. |     177426 |     340723 |      62195 |      54352
10. |     151951 |     331772 |      61802 |      46727
Average Values:
    |   169387.5 |   329891.5 |    59428.2 |    51406.0
Ratio:
    |      1.000 |      1.948 |      0.351 |      0.303

Live Demo on coliru

我有点想知道 coliru(只有 4 个线程)上的比率甚至比我的 PC(有 8 个线程)上的要好。实际上,我不知道如何解释这一点。 但是,这两种设置中存在许多其他差异,这些差异可能负责也可能不负责。至少,这两个测量结果都显示了 3rd 和 4th 方法的粗略加速,其中 2nd 唯一地消耗每个潜在的速度-up(可能通过启动和加入所有这些线程)。

查看示例代码,您会发现没有互斥锁或任何其他显式锁定。这是故意的。正如我所了解到的(很多很多年前),每次尝试并行化都可能导致一定的额外通信开销(对于必须交换数据的并发任务)。如果通信开销变得很大,它只会消耗并发的速度优势。因此,可以通过以下方式实现最佳加速:

  • 最少的通信开销,即并发任务对独立数据进行操作
  • 后合并并发计算结果的工作量最少。

在我的示例代码中,我

  1. 在启动线程之前准备好所有数据和存储,
  2. 线程运行时读取的共享数据永远不会改变,
  3. 以线程本地方式写入的数据(没有两个线程写入相同的数据地址)
  4. 在所有线程加入后评估计算结果。

关于 3. 我有点挣扎这是否合法,即是否允许写入线程的数据在加入后正确出现在主线程中。 (似乎工作正常的事实通常是虚幻的,但在多线程方面尤其虚幻。)

cppreference.com 提供以下解释

在 Stack Overflow 中,我找到了以下相关的 Q/A:

这说服了我,没关系。

但缺点是

  • 线程的创建和连接需要额外的工作(而且成本不高)。

另一种方法是使用线程池来克服这个问题。我用谷歌搜索了一下,发现例如Jakob Progsch's ThreadPool on github。然而,我猜想,有了线程池,锁定问题又回到了“游戏中”。

这是否也适用于 Eigen 函数,取决于 resp.实现了特征函数。如果其中有对全局变量的访问(当同时调用同一个函数时,这些全局变量变为共享),这将导致数据竞争。

谷歌了一下,我找到了以下文档。

Eigen and multi-threading – Using Eigen in a multi-threaded application:

如果您自己的应用程序是多线程的,并且多个线程调用Eigen,那么您必须在创建线程之前通过调用以下例程来初始化Eigen

#include <Eigen/Core>
int main(int argc, char** argv)
{
  Eigen::initParallel();
  ...
}

注意

使用Eigen 3.3 和完全符合C++11 的编译器(即thread-safe static local variable initialization),则调用initParallel() 是可选的。

警告

请注意,所有生成随机矩阵的函数都不是可重入的,也不是线程安全的。这些包括DenseBase::Random()DenseBase::setRandom(),尽管调用了 Eigen::initParallel()。这是因为这些函数基于不可重入的 std::rand。对于线程安全的随机生成器,我们推荐使用 boost::random 或 c++11 随机特性。

【讨论】:

    猜你喜欢
    • 2012-04-30
    • 2021-08-13
    • 2014-11-18
    • 1970-01-01
    • 1970-01-01
    • 2021-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多