【问题标题】:Linearize nested for loops线性化嵌套的 for 循环
【发布时间】:2022-01-21 13:49:42
【问题描述】:

我正在研究一些繁重的算法,现在我正在尝试使其成为多线程的。它有一个带有 2 个嵌套循环的循环:

for (int i = 0; i < n; ++i) {
    for (int j = i + 1; j < n; ++j) {
        for (int k = j + 1; k < n; ++k) {
            function(i, j, k);
        }
    }
}

我知道,function 的调用次数将等于

但我还有最后一个问题:我不知道如何根据b (0 &lt;= b &lt; binom(n, 3)) 计算ijk

for (int b = start; b < end; ++b) {
    // how to calculate i, j, k?
}

如何计算这些值?

编辑: 我的主要想法是从不同的线程调用这样的函数:

void calculate(int start, int end) {
    for (int b = start; b < end; ++b) {
        int i = ...;
        int j = ...;
        int k = ...;
        function(i, j, k);
    }
}

int total = binom(n, 3);

// thread A:
calculate(0, total / 2);

// thread B:
calculate(total / 2, total);

【问题讨论】:

  • b 到底是什么?我不认为我理解这个问题......
  • @MichalBurgunder 我已经更新了问题
  • 为什么不将 3 个 for 循环留在 calculate 中,并让每个线程调用 calculate 以获取 [0, total/2)[total/2, total),就像您目前所做的那样?最后,调用次数(复杂度)是相同的,您使用增量而不是公式(更快)计算ijk
  • @congard 酷;我理所当然地认为您在将代码更改为多线程代码时正在寻找性能,但我知道情况不一定如此。
  • 我投票结束这个问题,因为这是一个似乎与编程没有直接关系的数学问题。您可能想在math.stackexchange.com 上提问

标签: c++ multithreading math


【解决方案1】:

对您的问题的另一种看法。正如 cmets 中所说,您要寻找的基本上是找到继任者和组合的排序。为此,我使用了 Kreher 和 Stinson 的“组合算法”一书中的算法。

这里是由nextunrank这两个函数以及unranking函数中需要的二项式系数的辅助函数组成的对应代码:

int binomial ( int n, int k )
{
    int mn = k;
    if ( n - k < mn )
    {
        mn = n - k;
    }

    if ( mn < 0 ) { return 0; }
    if ( mn == 0 ) { return 1; }

    int mx = k;
    if ( mx < n - k )
    {
        mx = n - k;
    }
    int value = mx + 1;

    for (int i = 2; i <= mn; ++i)
    {
        value = ( value * ( mx + i ) ) / i;
    }

    return value;
}         
        
auto unrank(int rank, int n, int k)
{    
    std::vector<int> t(k);
    
    int x = 1;
    for (int i = 0; i < k; ++i)
    {
        while (true)
        {
            int b = binomial ( n - x, k - i - 1);
            if (b > rank) break;
            rank -= b;
            ++x;
        }
    
        t[i] = x;
        ++x;
    }

  return t;
}

auto next(std::vector<int>& index, int n, int k)
{
    for (int i = k-1; i >= 0; --i)
    {
        if (index[i] < n - (k-1) + i)
        {
            ++index[i];
            for (int j = i+1; j < k; ++j)
            {
                index[j] = index[j-1]+1;
            }                    
            return true;
        }
    }
    return false;
}

然后的想法是从给定的起始地址生成初始索引配置,然后计算此索引(end-start) 次的后继。这是一个例子:

int main()
{
    int n = 7;
    int k = 4;
    
    int start = 3;
    int end = 10;

    auto index = unrank(start,n,k);        
    auto print_index = [&]()
    {
        for(auto const& ind : index)
        {
            std::cout<<ind<<"  ";  
        }
        std::cout<<std::endl;
    };

    print_index();
    for(int i=start; i<end; ++i)
    {
        next(index, n, k);
        print_index();            
    }
}

打印出来的

1  2  3  7  
1  2  4  5  
1  2  4  6  
1  2  4  7  
1  2  5  6  
1  2  5  7  
1  2  6  7  
1  3  4  5 

这里是Demo。尽情享受吧!

【讨论】:

  • 这正是我所需要的,谢谢。它比原子要快得多,而且,与普通循环相比,它似乎有一个 ≈常数的开销
  • @congard: 顺便说一句:如果你想加快这段代码的速度,即减少常量开销,请应用 memoizing 二项式函数。
  • 您好,我在您的unrank 实现中发现了一个问题,这里是demo
  • @congard:把它归咎于 Kreher/Stinson,或者我对它的实现。抱歉,我没有时间提供二级支持——您现在已经掌握了基础知识,我建议您继续自己修复。
  • @congard:你是对的,你知道为什么吗?因为我在我的试验中为优化构建了另一个错误:-) 我想避免两次计算相同的二项式系数。我现在用不同的方式修复了它,这里是正确的code
【解决方案2】:

this post 中,我分享了一个名为multi_index 的类,它基本上可以满足您的需求,即

for(auto m : multi_index(3,3,4))
{
    // now m[i] holds index of i-th loop
    // m[0] goes from 0 to 2
    // m[1] goes from 0 to 2
    // m[2] goes from 0 to 3
    std::cout<<m[0]<<" "<<m[1]<<" "<<m[2]<<std::endl;
}

但是,此代码仅适用于“正常”循环,其中每个维度从 0 运行到某个上限值。

在这篇文章中,我将尝试将此应用于反对称情况,其中m[i]&lt;m[j] 对应于i&lt;j。链接代码的基本思想保持不变,即创建一个包含循环边界的类并提供一个可与基于范围的 for 循环一起使用的迭代器。唯一的区别是我使用std::vector 而不是std::array 作为索引数组类型:

#include <iostream>
#include <numeric>
#include <vector>

struct antisym_index_t
{
    int upper_index;
    int dim;
    antisym_index_t(int upper_index, int dim) : upper_index(upper_index), dim(dim) {}

    struct iterator
    {
        struct sentinel_t {};

        int upper_index;
        int dim;
        std::vector<int> index_array = {};
        bool _end = false;

        iterator(int upper_index, int dim) : upper_index(upper_index), dim(dim), index_array(dim)
        {
            std::iota(std::begin(index_array), std::end(index_array),0);
        }

        auto& operator++()
        {
            for (int i = dim-1;i >= 0;--i)
            {
                if (index_array[i] < upper_index - 1 - (dim-1-i))
                {
                    ++index_array[i];
                    for (int j = i+1;j < dim;++j)
                    {
                        index_array[j] = index_array[j-1]+1;
                    }                    
                    return *this;
                }
            }
            _end = true;
            return *this;
        }
        auto& operator*()
        {
            return index_array;
        }
        bool operator!=(sentinel_t) const
        {
            return !_end;
        }
    };

    auto begin() const
    {
        return iterator{ upper_index, dim };
    }
    auto end() const
    {
        return typename iterator::sentinel_t{};
    }
};

auto antisym_index(int upper_index, int dim)
{
    return antisym_index_t(upper_index, dim);
}

但是请注意,到目前为止,此代码尚未经过测试(写在我的脑海中)。你可以把它当作

for(auto m : antisym_index(5,3))
{
    // now m[i] holds index of i-th loop
    std::cout<<m[0]<<" "<<m[1]<<" "<<m[2]<<std::endl;
}

编辑:到目前为止,我已经测试并更正了代码,请参阅here。给自己的备忘录:不要发布未经测试的代码。

EDIT2:顺便说一句,这在问题中回答了您的问题。我不清楚这对多任务处理有何帮助。

【讨论】:

  • 非常有趣的解决方案,但不幸的是它仅适用于“正常”循环,它只有 upper_index 但我还需要类似 lower_index 的东西(即开始索引不等于 0)。但是你给了我一个想法,我稍后会尝试实施。不确定它是否会完全解决我的问题,但我希望它至少是一个临时解决方案
  • @congard:再次阅读您的问题后,在我看来,您想要的是所谓的组合“unranking”。也就是说,您输入一个数字,该数字是给定索引(也称为组合)的地址,然后您将返回索引的组成部分。这在数字组合中是相当标准的,但如果你不明白,请告诉我,我可以发布一些代码。
  • 如果可以,请发布一些代码。我将不胜感激
  • @congard:没问题,但你必须等到明天......我必须从我的另一台电脑上获取代码。如果我忘记了,请给我一个提示。
  • * 只是提醒你 *
【解决方案3】:

我没有完整的答案,但有 2 个循环的解决方案。我睡眠不足的大脑无法将其概括为 3 个循环,但也许其他人可以。

在 2D 中,问题变成了从扁平索引中找出三角矩阵的行和列索引。这使得很容易看出“逐渐减少”的末端包含在较大的末端中。在 ASCII 艺术中是这样的:

      n
 ___________
|_          |
| |_        |
|   |_      |
|   | |_    |
|   |   |_  |
|___|_____|_|
  i   ^
      |
     binom(n-i, 2)

所以,让我们定义

  • n循环结束索引(矩阵行/列数)
  • i 外循环计数器范围 [0, n)。如图:列索引
  • j 内循环计数器范围 [0, i)。如图所示:自下而上的行索引
  • a 扁平循环计数器范围 [0, binom(n, 2))

然后可以从binom(n, 2) - binom(n-i, 2) = a 计算出i。通过 Wolfram Alpha 的一次往返给我们:

  • i = trunc(-0.5 * sqrt((1 - 2 n)**2 - 8 a) + n - 0.5)

截断(=cast to int)“向下舍入”到最后一个完整的列。所以行索引j 可以计算为

  • j = a - (binom(n, 2) - binom(n-i, 2))
  • j = a - i*(-i + 2 n - 1) / 2

【讨论】:

    【解决方案4】:

    第三次尝试:

    我已经获取了你的代码,最后让它正常运行(在 python 中):

    def get_k(n):
        total = 0
        for i in range(3, n):
            for j in range(i + 1, n):
                for k in range(j + 1, n):
                    total += 1
                
        V = total // 2 # for 2 threads
        V_tmp = 0          
        for i in range(3, n):
            if(V_tmp > V):
                return i
            for j in range(i + 1, n):
                for k in range(j + 1, n):
                    V_tmp += 1
    
    def pseudo_thread(start, end, n):
        counter = 0
    
        for i in range(start, end):
            for j in range(i + 1, n):
                for k in range(j + 1, n):
                    counter += 1
        print(counter)
    
    
    n = 145
    k = get_k(n)
    
    pseudo_thread(3, k, n)
    pseudo_thread(k, n, n)
    

    这应该最终给你一个相对较好的分裂。即使 n=145,我们的计数器值也会得到 239260 和 227920。这显然不是一个优雅的解决方案,也不是完美的,但它在没有太多详细数学参考的情况下为您提供了正确的答案。

    【讨论】:

    • “像上面那样拆分计算会导致你的线程计算不同数量的值”但是为什么呢?由于(例如)线程 A 执行 calculate(0, total / 2) 和线程 B calculate(total / 2, total) (其中总计 = binom(n, 3))所以 end1 - start1 == end2 - start2
    • 看来 V 应该是 (n)*(n-1)*(n-2) / 6 (因为 binom(n, 3) = n!/((n-3)! * 3!)。我已经测试了你的例子,不幸的是,我无法让它工作。我写了a simple python script 进行测试,你可以看到,不幸的是,它打印了不同的值(116 和 4)。我错过了什么吗?
    【解决方案5】:

    根据您想要的并行化方式,您还可以使用原子结构并通过比较和交换操作实现迭代。大多数平台上都有一个 16 字节的 CAS。与 GCC 上的 -latomic 链接。如果我们确保正确对齐,Clang 会内联 CAS 调用。

    #include <atomic>
    #include <type_traits>
    #include <cstdio>
    
    /**
     * Index for a nested loop
     *
     * Index for loop in style
     * for(i = 0; i < n; ++i)
     *   for(j = 0; j < i; ++j)
     *     for(k = 0; k < j; ++k);
     *
     * The total number of iterations is binom(n, 3)
     *
     * Indices are int for two reasons:
     * 1. Keep overall size at or below 16 byte to allow atomic operations
     * 2. The total number of iterations reaches 2^64 at n ~ 4.8 million
     */
    struct Index {
      int i, j, k;
    
      constexpr Index() noexcept
      : i(2), j(1), k(0)
      {}
      Index& operator++() noexcept
      {
        if(k + 1 < j) {
          ++k;
          return *this;
        }
        k = 0;
        if(j + 1 < i) {
          ++j;
          return *this;
        }
        j = 0;
        ++i;
        return *this;
      }
    };
    
    /**
     * Padds Index to power of 2 alignment up to 16 byte
     *
     * This improves atomic operation performance because it avoids
     * split-locks. Not sure if GCC's std::atomic makes actual use of this
     * but clang does.
     */
    struct AlignedIndex
    {
    private:
      static constexpr std::size_t alignment =
        sizeof(Index) < 2 ? 1 :
        sizeof(Index) < 3 ? 2 :
        sizeof(Index) < 5 ? 4 :
        sizeof(Index) < 9 ? 8 :
        16;
    public:
      union {
        std::aligned_storage<sizeof(Index), alignment>::type pod;
        Index index;
      };
      constexpr AlignedIndex() noexcept
      : index()
      {}
    };
    
    Index increment(std::atomic<AlignedIndex>& index) noexcept
    {
      AlignedIndex last = index.load(std::memory_order_relaxed);
      AlignedIndex next;
      do {
        next = last;
        ++next.index;
      } while(! index.compare_exchange_weak(last, next, std::memory_order_relaxed));
      return last.index;
    }
    
    int main()
    {
      std::atomic<AlignedIndex> index(AlignedIndex{});
      int n = 5;
      for(Index cur; (cur = increment(index)).i < n; ) {
        std::printf("%d %d %d\n", cur.i, cur.j, cur.k);
      }
    }
    

    【讨论】:

    • 它可以工作,但不像我预期的那样:我需要一个组合生成器,但在第二次迭代中,您的解决方案给出了3 0 0。但是,经过一些修改后,它将按预期工作。我对互斥锁有类似的想法,但看起来你的代码会更快。无论如何,+1
    【解决方案6】:

    不是从 1..binom(n, 3) 迭代,而是从 1..n^3 迭代(概念上是数字 1..n 与自身 2x 的笛卡尔积,而不是3 个元素,不重复)。这样做,我们可以很容易地从 M 计算 i/j/k:

    k = (M / N^0) % N = M % N
    j = (M / N^1) % N
    i = (M / N^2) % N = M / N^2
    

    当然,这会导致重复,但我们不会一一跳过重复。一旦我们达到k&gt;=j 的数字,我们需要将b 增加(N-k)*N^0 = N-k 以使其再次“环绕”到0j&gt;=i 也是如此 - 将 b 增加 (N-j)*N^1,以环绕。

    这样做,我们只返回了原始数字集。除法和模数计算有一些开销,每个变量最多可以重复一次(减去第一个变量),所以是的,有一些开销,但它是恒定的,对于恒定数量的变量。

    【讨论】:

    • 它会起作用,但主要目标是在线程之间拆分工作,这样每个线程应该做相同数量的工作。如果只是将 n^3 分成 4 个相等的范围,那么在第一个中将比在最后一个中做更多的工作
    • @congard 不要将它们分成 3 个大“块”,将它们分成 3 个切片(即使用 % 而不是 /)。这样线程 1 得到 1, 4, 7, 10...,线程 2 ``2, 5, 8, ...` 等等。这样,它就会平衡
    • 你能举个例子吗?
    • @congard 对于 0N^2+jN+k 和 N=10,我们将迭代从 [0, 10^3)​​ 超过 M。我们将 M_b 的这些值分配给 3 个线程。而不是 {01,2,..,N^3/3}, {1+N^3/3, 2+N^3/3, ..., 2*N^3/3}, {1 +2*N^3/3, 2+2*N^3/3, ..., N^3} (连续块),我们有点“交错”每个集合的元素 - {0,3,6, ..., N^3-2}, {1,4,7, ..., N^3-1}, {2,5,8,..., N^3}。 (那里可能有一些错误,但你明白了)
    • 实际上,这种模式无论如何都不是最理想的——我在两天前发布我的答案后的某个时候意识到了这一点。它增加了很多开销做这个数学 - 最好的策略可能是解决 i_a*(i_a-1)*(i_a-2) = N*(N-1)*(N-2)*X_a i_0i_1X_1=1/3X_2=2/3 找到 i 的两个值来拆分最外层环形。然后,在每个线程中,以适当的时间间隔 ({0&lt;=i&lt;i_1}, {i_1&lt;=i&lt;i&lt;i_2}, {i_2&lt;=i&lt;N}) 遍历 jk 的所有有效值。开销最小,线程之间相对平衡(渐近地说)。
    【解决方案7】:

    这是基于Dillon Daviscomments的另一种解决方案。

    auto divide = [](float pos, int len) -> float {
        auto n = static_cast<float>(len);
    
        if (pos == 1) {
            return n;
        }
    
        if (pos == 0) {
            return 0;
        }
    
        // solve   x * (x - 1) * (x - 2) = n * (n - 1) * (n - 2) * pos   for x
        // https://en.wikipedia.org/wiki/Bisection_method
    
        float d = n * (n - 1) * (n - 2) * (1 - pos);
    
        auto f = [d](float x) {
            return std::pow(x, 3) - 3 * std::pow(x, 2) + 2 * x - d;
        };
    
        float a = 0;
        float b = n;
        float epsilon = 0.1f;
    
        float x = 0;
    
        while (std::abs(a - b) > epsilon) {
            x = (a + b) / 2;
    
            if (std::abs(f(x)) <= epsilon) {
                break;
            } else if (f(x) * f(a) < 0) {
                b = x;
            } else {
                a = x;
            }
        }
    
        return std::ceil(n - x);
    };
    

    它非常快且开销最小,但不如 davidhigh 的解决方案准确,后者允许将“工作”分成相等的部分。

    例子:

    auto testRun = [](int begin, int end, int n) {
        int counter = 0;
    
        for (int i = begin; i < end; ++i) {
            for (int j = i + 1; j < n; ++j) {
                for (int k = j + 1; k < n; ++k) {
                    ++counter;
                }
            }
        }
    
        std::cout << counter << "\n";
    };
    
    int n = 1200;
    int ranges = 4;
    
    for (int i = 0; i < ranges; ++i) {
        auto begin = static_cast<int>(divide((float) i / (float) ranges, n));
        auto end = static_cast<int>(divide((float) (i + 1) / (float) ranges, n));
        testRun(begin, end, n);
    }
    

    输出:

    72035920
    71897080
    71619380
    71728020
    

    【讨论】:

      猜你喜欢
      • 2012-01-27
      • 2020-05-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多