【问题标题】:Allocating a large memory block in C++在 C++ 中分配大内存块
【发布时间】:2019-02-03 17:01:39
【问题描述】:

我正在尝试为浮点值的 C++ 中的 3D 矩阵分配一个大内存块。它的尺寸是 44100x2200x2。这应该占用大约 7.7gb 的 44100x2200x2x4 字节的内存。我正在使用 Ubuntu 的 64 位 x86 机器上使用 g++ 编译我的代码。当我使用 htop 查看进程时,我看到内存使用量增长到 32gb 并立即被杀死。我的内存计算有误吗?

这是我的代码:

#include <iostream>

using namespace std;
int main(int argc, char* argv[]) {
  int N = 22000;
  int M = 44100;
  float*** a = new float**[N];
  for (int m = 0; m<N; m+=1) {
    cout<<((float)m/(float)N)<<endl;
    a[m] = new float*[M - 1];
    for (int n = 0; n<M - 1; n+=1) {
      a[m][n] = new float[2];
    }
  }
}

编辑:我的计算不正确,我分配的内存接近 38gb。我现在将代码修复为分配 15gb。

#include <iostream>

using namespace std;
int main(int argc, char* argv[]) {
  unsigned long  N = 22000;
  unsigned long  M = 44100;
  unsigned long blk_dim = N*(M-1)*2;
  float* blk = new float[blk_dim];
  unsigned long b = (unsigned long) blk;

  float*** a = new float**[N];
  for (int m = 0; m<N; m+=1) {
    unsigned long offset1 = m*(M - 1)*2*sizeof(float);
    a[m] = new float*[M - 1];
    for (int n = 0; n<M - 1; n+=1) {
      unsigned long offset2 = n*2*sizeof(float);
      a[m][n] = (float*)(offset1 + offset2 + b);
    }
  }
}

【问题讨论】:

  • 这意味着 7.7GB 的 连续 内存。这对于分配器来说是一个相当困难的要求,而且许多系统都无法达到。你能把它分成许多小块吗?
  • @Alejandro 连续内存如何(如在 impl. 中)?不是每个“新”都是独立的/不连续的(因此可能效率较低)?或者“相当困难的任务”指的是它不是这样的?
  • @Alejandro 实际上是连续的,是的,但在物理上不是连续的。这是很多内存,但在 Linux 上,它会直接转到 mmap(2)
  • @SpentDeath 1) 多次说错误的事情并不能使它正确[8 不可能正确,原因如上所述]; 2) 代码中的 N 是 22,000 - 不是 2,200; (3. 如果有任何注意括号..)
  • 你能解释一下为什么你说“我正在尝试分配一个大内存块”然后发布你实际分配数百万个小内存块的代码吗?

标签: c++ memory-management


【解决方案1】:

您忘记了一个维度,以及分配内存的开销。所示代码在三维空间中分配内存的效率非常低,导致开销太大。

float*** a = new float**[N];

这将分配大约22000 * sizeof(float **),大约为176kb。可以忽略不计。

a[m] = new float*[M - 1];

这里的单个分配将用于44099 * sizeof(float *),但您将获得其中的 22000 个。 22000 * 44099 * sizeof(float *),或大约 7.7gb 的额外内存。这是您停止计数的地方,但您的代码尚未完成。还有很长的路要走。

a[m][n] = new float[2];

这是 8 字节的单次分配,但此分配将执行 22000 * 44099 次。这是另一个 7.7gb 被冲进下水道。您现在大约需要分配超过 15 GB 的应用程序所需的内存。

但是每个分配都不是免费的,而且new float[2] 需要超过 8 个字节。每个单独分配的块必须由您的 C++ 库在内部跟踪,以便delete 可以回收它。最简单的基于链表的堆分配实现需要一个前向指针、一个后向指针以及分配块中​​有多少字节的计数。假设出于对齐目的不需要填充任何内容,那么在 64 位平台上,每次分配至少需要 24 字节的开销。

现在,由于您的第三维分配 22000 * 44099 次,第二维分配 22000 次,第一维分配一次:如果我指望我的手指,这将需要 (22000 * 44099 + 22000 + 1) * 24 或另外 22 GB 的内存,只是为了消耗最简单、基本的内存分配方案的开销。

如果我的数学计算正确的话,我们现在需要使用最简单、可能的堆分配跟踪大约 38 GB 的 RAM。您的 C++ 实现可能会使用更复杂的堆分配逻辑,但开销更大。

摆脱new float[2]。计算矩阵的大小,new 一个 7.7gb 块,然后计算其余指针应该指向的位置。此外,为矩阵的第二维分配一块内存,并计算第一维的指针。

您的分配代码应该恰好执行三个new 语句。一个用于第一维指针,一个用于第二维指针。还有一个用于构成您的第三维的大量数据。

【讨论】:

  • 任何对 8 字节块使用这么多开销的分配方案都是垃圾。一个合理的分配方案每次分配最多使用 8 个字节,而一个好的分配方案将只在每页内存中使用一小块专用于特定大小的块,因此 8 字节分配只有 1-2% 的开销。
  • 这正是我的问题!我更新了我的代码以使用 15gb,并且 htop 验证我确实只使用了 15gb。有什么办法可以提高我的内存使用率以使用 7.7gb 吗?我能想到的唯一 hack 就是使用 blk 数组和索引 [m][n] 通过线性变换到正确的索引。我想我可以使用宏来使它更好,但除了骇人听闻的方式之外,还有什么其他的吗?
  • 有什么理由不能只做class BigArray {public: float _bigArray[44100][22000][2];}; BigArray * myBigArray = new BigArray; 吗?那将使用“仅”7.7GiB。
  • 在这种情况下,您可以使用 float * bigArray = new float[M*N*O]; 并编写 getter 和 setter 方法来计算一维数组的正确偏移量。最主要的是,由于数组是规则的,你只需要对new进行一次调用。
  • @SpentDeath 我的意思是您正在为大量单元分配空间。如果实际上不需要其中的大多数(否则它们无论如何都会带有一些默认值)并且您不需要将部分或全部这个怪物提供给需要连续内存的某些 API(这您发布的有问题的代码无论如何都没有),您可以使用其他选项。例如,将 int 映射到 int 到 2 单元数组的映射。性能不会那么好(显然,在最好的情况下,我们正在讨论通过无序映射进行散列),但内存利用率可能会相当更好。
【解决方案2】:

只是为了完善已经给出的答案,下面的示例基本上是在how to create a contiguous 2D array 上给出的答案的扩展,并说明了仅对new[] 进行 3 次调用的用法。

优点是您保留了通常与三指针一起使用的[][][] 语法(尽管我强烈建议不要像这样使用“3 星”来编写代码,但我们有我们所拥有的)。缺点是为指针分配了更多的内存,并为数据添加了单个内存池。

#include <iostream>
#include <exception>

template <typename T>
T*** create3DArray(unsigned pages, unsigned nrows, unsigned ncols, const T& val = T())
{
    T*** ptr = nullptr;  // allocate pointers to pages
    T** ptrMem = nullptr;
    T* pool = nullptr;
    try 
    {
        ptr = new T**[pages];  // allocate pointers to pages
        ptrMem = new T*[pages * nrows]; // allocate pointers to pool
        pool = new T[nrows*ncols*pages]{ val };  // allocate pool

        // Assign page pointers to point to the pages memory,
        // and pool pointers to point to each row the data pool
        for (unsigned i = 0; i < pages; ++i, ptrMem += nrows)
        {
            ptr[i] = ptrMem;
            for (unsigned j = 0; j < nrows; ++j, pool += ncols)
                ptr[i][j] = pool;
        }
        return ptr;
     }
     catch(std::bad_alloc& ex)
     {
         // rollback the previous allocations
        delete [] ptrMem;
        delete [] ptr;
        throw ex; 
    }
}

template <typename T>
void delete3DArray(T*** arr)
{
    delete[] arr[0][0]; // remove pool
    delete[] arr[0];  // remove the pointers
    delete[] arr;     // remove the pages
}

int main()
{
    double ***dPtr = nullptr;
    try 
    {
        dPtr = create3DArray<double>(4100, 5000, 2);
    }
    catch(std::bad_alloc& )
    {
        std::cout << "Could not allocate memory";
        return -1;
    }
    dPtr[0][0][0] = 10;  // for example
    std::cout << dPtr[0][0][0] << "\n";
    delete3DArray(dPtr);  // free the memory
}

Live Example

【讨论】:

  • 我喜欢这个概括性。比我编辑的 hacky 版本好得多。有没有办法重载原始类型的构造函数和析构函数?我有更好的东西想要。
  • 您可以将其打包在一个类中,因为链接的答案状态可以/应该这样做以利用 RAII(即,当对象超出范围时,数组会自动释放)。请注意,您需要向类添加复制构造函数和赋值运算符以及析构函数。我有点懒得不创建一个成熟的类,而只想展示如何开始组装这样的东西的内部工作原理。
  • 我正在考虑将它推广到任何类型 T 的 n 维。我意识到我的想法行不通。有没有办法将其推广到 N 维?因为这是我在不久的将来需要用到的二维向量的 N 维矩阵。
  • @SpentDeath:假设 N 是一个编译时常量,这并不难,但需要一些模板递归。在顶层,您创建池 T[dim1*...*dimN],在第 1 层创建 T* [dim1*...*dimN-1],直到在最后一层您只创建 T****[dim1]
  • @PaulMcKenzie 我提交了一个简单(嗯,相当简单)的类实现。
【解决方案3】:

这可能是您的问题的简化版本,但是您使用的数据结构(“三星”数组)几乎从来都不是您想要的。如果您要创建像这里这样的密集矩阵,并分配每个元素的空间,进行数百万次微小分配根本没有任何优势。如果你想要一个稀疏矩阵,你通常需要一种像压缩稀疏行这样的格式。

如果数组是“矩形的”(或者我认为 3-D 的数组是“方形的”),并且所有行和列的大小相同,那么与分配单个内存块相比,这种数据结构纯粹是浪费.您执行了数百万次微小的分配,为数百万个指针分配空间,并丢失了内存的局部性。

此样板为动态 3-D 数组创建了零成本抽象。 (好吧,几乎:存储底层一维std::vector 的长度和各个维度是多余的。)API 使用a(i, j, k) 作为a[i][j][k]a.at(i,j,k) 的等价物作为具有边界的变体-检查。

此 API 还具有使用索引函数f(i,j,k) 填充数组的选项。如果您调用a.generate(f),它会设置每个a(i,j,k) = f(i,j,k)。理论上,这会降低内部循环内的偏移计算,使其更快。 API 还可以将生成函数作为array3d&lt;float&gt;(M, N, P, f) 传递给构造函数。请随意扩展。

#include <cassert>
#include <cstddef>
#include <cstdlib>
#include <functional>
#include <iomanip>
#include <iostream>
#include <vector>

using std::cout;
using std::endl;
using std::ptrdiff_t;
using std::size_t;

/* In a real-world implementation, this class would be split into a
 * header file and a definitions file.
 */
template <typename T>
  class array3d {
    public:
    using value_type = T;
    using size_type = size_t;
    using difference_type = ptrdiff_t;
    using reference = T&;
    using const_reference = const T&;
    using pointer = T*;
    using const_pointer = const T*;
    using iterator = typename std::vector<T>::iterator;
    using const_iterator = typename std::vector<T>::const_iterator;
    using reverse_iterator = typename std::vector<T>::reverse_iterator;
    using const_reverse_iterator = typename
      std::vector<T>::const_reverse_iterator;

/* For this trivial example, I don’t define a default constructor or an API
 * to resize a 3D array.
 */
    array3d( const ptrdiff_t rows,
             const ptrdiff_t cols,
             const ptrdiff_t layers )
    {
      const ptrdiff_t nelements = rows*cols*layers;

      assert(rows > 0);
      assert(cols > 0);
      assert(layers > 0);
      assert(nelements > 0);

      nrows = rows;
      ncols = cols;
      nlayers = layers;
      storage.resize(static_cast<size_t>(nelements));
    }

/* Variant that initializes an array with bounds and then fills each element
 * (i,j,k) with a provided function f(i,j,k).
 */
    array3d( const ptrdiff_t rows,
             const ptrdiff_t cols,
             const ptrdiff_t layers,
             const std::function<T(ptrdiff_t, ptrdiff_t, ptrdiff_t)> f )
    {
      const ptrdiff_t nelements = rows*cols*layers;

      assert(rows > 0);
      assert(cols > 0);
      assert(layers > 0);
      assert(nelements > 0);

      nrows = rows;
      ncols = cols;
      nlayers = layers;
      storage.reserve(static_cast<size_t>(nelements));

      for ( ptrdiff_t i = 0; i < nrows; ++i )
        for ( ptrdiff_t j = 0; j < ncols; ++j )
          for ( ptrdiff_t k = 0; k < nlayers; ++k )
            storage.emplace_back(f(i,j,k));

      assert( storage.size() == static_cast<size_t>(nelements) );
    }

    // Rule of 5:
    array3d( const array3d& ) = default;
    array3d& operator= ( const array3d& ) = default;
    array3d( array3d&& ) = default;
    array3d& operator= (array3d&&) = default;

    /* a(i,j,k) is the equivalent of a[i][j][k], except that the indices are
     * signed rather than unsigned.  WARNING: It does not check bounds!
     */
    T& operator() ( const ptrdiff_t i,
                    const ptrdiff_t j,
                    const ptrdiff_t k ) noexcept
    {
      return storage[make_index(i,j,k)];
    }

    const T& operator() ( const ptrdiff_t i,
                          const ptrdiff_t j,
                          const ptrdiff_t k ) const noexcept
    {
      return const_cast<array3d&>(*this)(i,j,k);
    }

    /* a.at(i,j,k) checks bounds.  Error-checking is by assertion, rather than
     * by exception, and the indices are signed.
     */
    T& at( const ptrdiff_t i, const ptrdiff_t j, const ptrdiff_t k )
    {
      bounds_check(i,j,k);
      return (*this)(i,j,k);
    }

    const T& at( const ptrdiff_t i,
                 const ptrdiff_t j,
                 const ptrdiff_t k ) const
    {
      return const_cast<array3d&>(*this).at(i,j,k);
    }

/* Given a function or function object f(i,j,k), fills each element of the
 * container with a(i,j,k) = f(i,j,k).
 */
    void generate( const std::function<T(ptrdiff_t,
                                         ptrdiff_t,
                                         ptrdiff_t)> f )
    {
      iterator it = storage.begin();

      for ( ptrdiff_t i = 0; i < nrows; ++i )
        for ( ptrdiff_t j = 0; j < ncols; ++j )
          for ( ptrdiff_t k = 0; k < nlayers; ++k )
            *it++ = f(i,j,k);

      assert(it == storage.end());
    }

/* Could define a larger API, e.g. begin(), end(), rbegin() and rend() from the STL.
 * Whatever you need.
 */

    private:
    ptrdiff_t nrows, ncols, nlayers;
    std::vector<T> storage;

    constexpr size_t make_index( const ptrdiff_t i,
                                 const ptrdiff_t j,
                                 const ptrdiff_t k ) const noexcept
    {
      return static_cast<size_t>((i*ncols + j)*nlayers + k);
    }

    // This could instead throw std::out_of_range, like STL containers.
    constexpr void bounds_check( const ptrdiff_t i,
                                 const ptrdiff_t j,
                                 const ptrdiff_t k ) const
    {
      assert( i >=0 && i < nrows );
      assert( j >= 0 && j < ncols );
      assert( k >= 0 && k < nlayers );
    }
};

// In a real-world scenario, this test driver would be in another source file:

constexpr float f( const ptrdiff_t i, const ptrdiff_t j, const ptrdiff_t k )
{
  return static_cast<float>( k==0 ? 1.0 : -1.0 *
                             ((double)i + (double)j*1E-4));
}

int main(void)
{
  constexpr ptrdiff_t N = 2200, M = 4410, P = 2;
  const array3d<float> a(N, M, P, f);

  // Should be: -1234.4321
  cout << std::setprecision(8) << a.at(1234,4321,1) << endl;

  return EXIT_SUCCESS;
}

值得注意的是,这段代码在技术上包含未定义的行为:它假设有符号整数乘法溢出产生一个负数,但实际上如果程序在运行时请求一些荒谬的内存量,编译器有权生成完全损坏的代码.

当然,如果数组边界是常量,只需声明它们constexpr 并使用具有固定边界的数组。

不幸的是,每个新的 C++ 程序员都首先了解char** argv,因为这使人们认为“二维”数组是指向行的指针的“参差不齐”数组。

在现实世界中,这几乎不是最适合这项工作的数据结构。

【讨论】:

  • 尽管像你说的那样不推荐使用这样的类,但我认为只要尝试理解它就可以学到很多关于 C++ 的知识,因此这是一个非常有趣的 POC。
猜你喜欢
  • 1970-01-01
  • 2015-05-16
  • 2010-10-10
  • 1970-01-01
  • 2021-03-09
  • 2016-02-13
  • 2017-11-14
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多