【问题标题】:Fast algorithm for computing intersections of N sets计算 N 组交集的快速算法
【发布时间】:2019-02-08 16:59:26
【问题描述】:

我有 n 个集合 A0,A2,...An-1 持有集合 E 的项目。

我将配置 C 定义为由 n 位组成的整数,因此 C 的值介于 0 和 2^n-1 之间。现在,我定义以下内容:

(C)   an item e of E is in configuration C 
       <=> for each bit b of C, if b==1 then e is in Ab, else e is not in Ab

例如对于 n=3,配置 C=011 对应于 A0 和 A1 但不在 A2 中的 E 项(NOT 很重要)

C[bitmap] 是集合中恰好具有该存在/不存在模式的元素的数量。 C[001] 是 A0 中不存在于任何其他集合中的元素的数量。


另一个可能的定义是:

(V)   an item e of E is in configuration V 
       <=> for each bit b of V, if b==1 then e is in Ab

例如对于 n=3,(V) 配置 V=011 对应于 A0 和 A1 中的 E 项

V[bitmap] 是所选集合的交集计数。(即位图为真的所有集合中有多少元素的计数。)V[001] 是A0 中的元素数。 V[011] 是 A0 A1 中的元素数量,无论它们是否也在 A2 中。


以下,第一张图为A0、A1、A2组的项目,第二张图为(C)构型的尺寸,第三张图为(V)构型的尺寸。

我也可以用两个向量中的任何一个来表示配置:

C[001]= 5       V[001]=14
C[010]=10       V[010]=22
C[100]=11       V[100]=24
C[011]= 2       V[011]= 6
C[101]= 3       V[101]= 7
C[110]= 6       V[110]=10
C[111]= 4       V[111]= 4

我想要编写一个 C/C++ 函数,尽可能高效地将 C 转换为 V。一种天真的方法可能是以下明显在 O(4^n) 中的“transfo”函数:

#include <vector>
#include <cstdio>

using namespace std;

vector<size_t> transfo (const vector<size_t>& C)
{
    vector<size_t> V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
} 


int main()
{
    vector<size_t> C = { 
        /* 000 */  0,
        /* 001 */  5,
        /* 010 */ 10,
        /* 011 */  2,
        /* 100 */ 11,
        /* 101 */  3,
        /* 110 */  6,
        /* 111 */  4
    };

    vector<size_t> V = transfo (C);

    for (size_t i=1; i<V.size(); i++)  {  printf ("[%2ld]  C=%2ld   V=%2ld\n", i, C[i], V[i]);  }
}

我的问题是:有没有比简单的算法更有效的将向量 C 转换为向量 V 的算法?那么这样一个“好”算法的复杂度是多少?

请注意,我可能对任何 SIMD 解决方案感兴趣。

【问题讨论】:

  • 对于实际问题的 SIMD 优化,您是否有一个典型的n?我们是在谈论循环数以千计的C 值吗?或者所有C 值是否都适合单个SIMD 向量并且需要改组?计数是否真的需要为size_t(通常为64 位),或者像uint16_tuint8_t 这样的更窄的整数类型是否有效? (每个 SIMD 向量的更多元素通常更有效,具体取决于需要的改组量。uint8_t 的水平总和在 x86 上确实有效,psadbw 反对 0)。
  • 从实现 POV 来看,它可能会帮助编译器在内部循环中汇总为 tmp var,并将其存储到最后的 V[i]。两个相同类型的std::vectors 之间的别名分析可能并不完美。此外,对于单个位集的i 值,可能值得尝试通过跳过j 值的范围来减少工作量。但要提高效率会很困难。
  • 您可以使用类似于How to count character occurrences using SIMD 的 SIMD 来实现条件和。保留i 值(可能每个元素都相同)和j 值(例如 16 个连续值)的 SIMD 向量。 SIMD AND,然后将结果与i 向量进行比较以实现(i&amp;j) == i,并使用它来屏蔽来自C[j + 0..16] 的SIMD 负载,您将_mm_add_epi8epi16(或32 或64)转换为向量累加器。 (所以你为某些元素添加了0。)使用足够宽的元素,你可以在之后进行hsum,或者为PSADBW使用8位。
  • 有点类似的问题:stackoverflow.com/questions/54097024/…。您需要将 Hadamard 矩阵替换为 Sierpinski 矩阵(即,将 -1 替换为 0)。实际实现取决于您实际需要的输入/输出类型(您实际需要多少位)以及您的目标架构。

标签: c++ algorithm set intersection simd


【解决方案1】:

好吧,你正在尝试计算 2n 个值,所以你不能做得比 O(2n) 更好。

朴素的方法是从观察到 V[X] 是通过固定 X 中的所有 1 位并迭代所有可能的 0 位的值来获得的。例如,

V[010] = C[010] + C[011] + C[110] + C[111]

但是这种方法对 V 的每个元素执行 O(2n) 加法,总复杂度为 O(4n)。

这是一个 O(n × 2n) 算法。我也很好奇是否存在 O(2n) 算法。

n = 4。让我们考虑 V 与 C 的完整表格。下表中的每一行对应一个 V 值,该值是通过将标有* 的列相加来计算的。 * 符号的布局可以很容易地从幼稚的方法中推导出来。

    |0000|0001|0010|0011|0100|0101|0110|0111||1000|1001|1010|1011|1100|1101|1110|1111
0000| *  | *  | *  | *  | *  | *  | *  | *  || *  | *  | *  | *  | *  | *  | *  | *  
0001|    | *  |    | *  |    | *  |    | *  ||    | *  |    | *  |    | *  |    | *  
0010|    |    | *  | *  |    |    | *  | *  ||    |    | *  | *  |    |    | *  | *  
0011|    |    |    | *  |    |    |    | *  ||    |    |    | *  |    |    |    | *  
0100|    |    |    |    | *  | *  | *  | *  ||    |    |    |    | *  | *  | *  | *  
0101|    |    |    |    |    | *  |    | *  ||    |    |    |    |    | *  |    | *  
0110|    |    |    |    |    |    | *  | *  ||    |    |    |    |    |    | *  | *  
0111|    |    |    |    |    |    |    | *  ||    |    |    |    |    |    |    | *  
-------------------------------------------------------------------------------------
1000|    |    |    |    |    |    |    |    || *  | *  | *  | *  | *  | *  | *  | *  
1001|    |    |    |    |    |    |    |    ||    | *  |    | *  |    | *  |    | *  
1010|    |    |    |    |    |    |    |    ||    |    | *  | *  |    |    | *  | *  
1011|    |    |    |    |    |    |    |    ||    |    |    | *  |    |    |    | *  
1100|    |    |    |    |    |    |    |    ||    |    |    |    | *  | *  | *  | *  
1101|    |    |    |    |    |    |    |    ||    |    |    |    |    | *  |    | *  
1110|    |    |    |    |    |    |    |    ||    |    |    |    |    |    | *  | *  
1111|    |    |    |    |    |    |    |    ||    |    |    |    |    |    |    | *  

请注意,左上角、右上角和右下角包含相同的布局。因此,我们可以按如下方式批量执行一些计算:

  1. 计算表格的下半部分(右下角)。
  2. 将值添加到上半部分。
  3. 计算左上角。

如果我们让q = 2n,则循环复杂度为

T(q) = 2T(q/2) + O(q)

使用Master Theorem来解决

T(q) = O(q log q)

或者,就n而言,

T(n) = O(n × 2n)

【讨论】:

  • 感谢您的有趣回答。但是,我将切换步骤 2 和 3,以便在步骤 2 中添加值之前处理两个递归调用。否则(至少在我从您的答案中尝试的实现中),不可能计算太多倍一些价值?如果我的原始问题被重新打开,我将提供我对您建议的实施作为答案。
  • 我们又见面了,Mr Sierpiński
  • 对于非递归算法,请查看类似的Fast Walsh-Hadamard transform。仅删除所有减法(并调整顺序)。
【解决方案2】:

根据@CătălinFrâncu 的伟大观察,我写了两个递归的转换实现(见下面的代码):

  • transfo_recursive: 非常简单的递归实现
  • transfo_avx2 :仍然是递归的,但在 n=3 的递归的最后一步使用 AVX2

我在这里建议将计数器的大小编码为 32 位,并且 n 值可以增长到 28。

我还根据自己对递归行为的观察编写了一个迭代实现 (transfo_iterative)。实际上,我猜它接近于@chtz 提出的非递归实现。

这是基准代码:

// compiled with: g++ -O3   intersect.cpp -march=native -mavx2 -lpthread -DNDEBUG

#include <vector>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <thread>
#include <algorithm>
#include <sys/times.h>
#include <immintrin.h>
#include <boost/align/aligned_allocator.hpp>

using namespace std;

////////////////////////////////////////////////////////////////////////////////

typedef u_int32_t Count;

// Note: alignment is important for AVX2
typedef std::vector<Count,boost::alignment::aligned_allocator<Count, 8*sizeof(Count)>>  CountVector;

typedef void (*callback) (CountVector::pointer C, size_t q);
typedef vector<pair<const char*, callback>> FunctionsVector;

unsigned int randomSeed = 0;

////////////////////////////////////////////////////////////////////////////////
double timestamp()
{
    struct timespec timet;
    clock_gettime(CLOCK_MONOTONIC, &timet);
    return timet.tv_sec + (timet.tv_nsec/ 1000000000.0);
}

////////////////////////////////////////////////////////////////////////////////
CountVector getRandomVector (size_t n)
{
    // We use the same seed, so we'll get the same random values
    srand (randomSeed);

    // We fill a vector of size q=2^n with random values
    CountVector C(1ULL<<n);
    for (size_t i=0; i<C.size(); i++)  { C[i] = rand() % (1ULL<<(8*sizeof(Count))); }
    return C;
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block (CountVector::pointer C, size_t q)
{
    for (size_t i=0; i<q/2; i++)   {  C[i] += C[i+q/2]; }
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block_avx2 (CountVector::pointer C, size_t q)
{
    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    size_t imax = q/(2*8);

    for (size_t i=0; i<imax; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Naive approach : O(4^n)
////////////////////////////////////////////////////////////////////////////////
CountVector transfo_naive (const CountVector& C)
{
    CountVector V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recursive (CountVector::pointer C, size_t q)
{
    if (q>1)
    {
        transfo_recursive (C+q/2, q/2);
        transfo_recursive (C,     q/2);

        copy_add_block (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Iterative approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_iterative (CountVector::pointer C, size_t q)
{
    size_t i = 0;

    for (size_t n=q; n>1; n>>=1, i++)
    {
        size_t d = 1<<i;

        for (ssize_t j=q-1-d; j>=0; j--)
        {
            if ( ((j>>i)&1)==0) { C[j] += C[j+d]; }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive AVX2 approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////

#define ROTATE1(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,7,6,5,4,3,2,1))
#define ROTATE2(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,7,6,5,4,3,2))
#define ROTATE4(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,0,0,7,6,5,4))

void transfo_avx2 (CountVector::pointer V, size_t N)
{
    __m256i k1 = _mm256_set_epi32 (0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF);
    __m256i k2 = _mm256_set_epi32 (0,0,0xFFFFFFFF,0xFFFFFFFF,0,0,0xFFFFFFFF,0xFFFFFFFF);
    __m256i k4 = _mm256_set_epi32 (0,0,0,0,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF);

    if (N==8)
    {
        __m256i* source = (__m256i*) (V);

        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE1(*source),k1));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE2(*source),k2));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE4(*source),k4));
    }
    else // if (N>8)
    {
        transfo_avx2 (V+N/2, N/2);
        transfo_avx2 (V,     N/2);

        copy_add_block_avx2  (V, N);
    }
}

#define ROTATE1_AND(s)  _mm256_srli_epi64 ((s), 32)  // odd 32bit elements
#define ROTATE2_AND(s)  _mm256_bsrli_epi128 ((s), 8) // high 64bit halves
// gcc doesn't have _mm256_zextsi128_si256
// and _mm256_castsi128_si256 doesn't guarantee zero extension
// vperm2i118 can do the same job as vextracti128, but is slower on Ryzen
#ifdef __clang__                                      // high 128bit lane
#define ROTATE4_AND(s)  _mm256_zextsi128_si256(_mm256_extracti128_si256((s),1))
#else
//#define ROTATE4_AND(s)  _mm256_castsi128_si256(_mm256_extracti128_si256((s),1))
#define ROTATE4_AND(s)  _mm256_permute2x128_si256((s),(s),0x81)  // high bit set = zero that lane
#endif

void transfo_avx2_pcordes (CountVector::pointer C, size_t q)
{
    if (q==8)
    {
        __m256i* source = (__m256i*) (C);
        __m256i tmp = *source;
        tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
        *source = tmp;
    }
    else //if (N>8)
    {
        transfo_avx2_pcordes (C+q/2, q/2);
        transfo_avx2_pcordes (C,     q/2);

        copy_add_block_avx2  (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Template specialization (same as transfo_avx2_pcordes)
////////////////////////////////////////////////////////////////////////////////
template <int n>
void transfo_template (__m256i* C)
{
    const size_t q = 1ULL << n;

    transfo_template<n-1> (C);
    transfo_template<n-1> (C + q/2);

    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    for (size_t i=0; i<q/2; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

template <>
void transfo_template<0> (__m256i* C)
{
    __m256i* source = (__m256i*) (C);
    __m256i tmp = *source;
    tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
    *source = tmp;
}

void transfo_recur_template (CountVector::pointer C, size_t q)
{
#define CASE(n)     case 1ULL<<n: transfo_template<n> ((__m256i*)C);   break;

    q = q / 8; // 8 is the number of 32 bits items in the AVX2 registers

    // We have to 'link' the dynamic value of q with a static template specialization
    switch (q)
    {
                  CASE( 1); CASE( 2); CASE( 3); CASE( 4); CASE( 5); CASE( 6); CASE( 7); CASE( 8); CASE( 9);
        CASE(10); CASE(11); CASE(12); CASE(13); CASE(14); CASE(15); CASE(16); CASE(17); CASE(18); CASE(19);
        CASE(20); CASE(21); CASE(22); CASE(23); CASE(24); CASE(25); CASE(26); CASE(27); CASE(28); CASE(29);

        default: printf ("transfo_template undefined for q=%ld\n", q);  break;
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach multithread : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recur_thread (CountVector::pointer C, size_t q)
{
    std::thread t1 (transfo_recur_template, C+q/2, q/2);
    std::thread t2 (transfo_recur_template, C,     q/2);
    t1.join();
    t2.join();

    copy_add_block_avx2 (C, q);
}

////////////////////////////////////////////////////////////////////////////////
void header (const char* title, const FunctionsVector& functions)
{
    printf ("\n");
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%s\n", title);
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%3s\t", "# n");
    for (auto fct : functions)  {  printf ("%20s\t", fct.first);  }
    printf ("\n");
}

////////////////////////////////////////////////////////////////////////////////
// Check that alternative implementations provide the same result as the naive one
////////////////////////////////////////////////////////////////////////////////
void check (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("CHECK (0 values means similar to naive approach)", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);

        CountVector reference = transfo_naive (getRandomVector(n));

        for (auto fct : functions)
        {
            // We call the (in place) transformation
            CountVector C = getRandomVector(n);
            (*fct.second) (C.data(), C.size());

            int nbDiffs= 0;
            for (size_t i=0; i<C.size(); i++)
            {
                if (reference[i]!=C[i]) { nbDiffs++; }
            }
            printf ("%20ld\t", nbDiffs);
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
// Performance test
////////////////////////////////////////////////////////////////////////////////
void performance (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("PERFORMANCE", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);
        for (auto fct : functions)
        {
            // We compute the average time for several executions
            // We use more executions for small n values in order
            // to have more accurate results
            size_t nbRuns = 1ULL<<(2+nmax-n);
            vector<double> timeValues;

            // We run the test several times
            for (size_t r=0; r<nbRuns; r++)
            {
                // We don't want to measure time for vector fill
                CountVector C = getRandomVector(n);

                double t0 = timestamp();
                (*fct.second) (C.data(), C.size());
                double t1 = timestamp();

                timeValues.push_back (t1-t0);
            }

            // We sort the vector of times in order to get the median value
            std::sort (timeValues.begin(), timeValues.end());

            double median = timeValues[timeValues.size()/2];
            printf ("%20lf\t", log(1000.0*1000.0*median)/log(2));
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
//
////////////////////////////////////////////////////////////////////////////////
int main (int argc, char* argv[])
{
    size_t nmin = argc>=2 ? atoi(argv[1]) : 14;
    size_t nmax = argc>=3 ? atoi(argv[2]) : 28;

    // We get a common random seed
    randomSeed = time(NULL);

    FunctionsVector functions = {
        make_pair ("transfo_recursive",        transfo_recursive),
        make_pair ("transfo_iterative",        transfo_iterative),
        make_pair ("transfo_avx2",             transfo_avx2),
        make_pair ("transfo_avx2_pcordes",     transfo_avx2_pcordes),
        make_pair ("transfo_recur_template",   transfo_recur_template),
        make_pair ("transfo_recur_thread",     transfo_recur_thread)
    };

    // We check for some n that alternative implementations
    // provide the same result as the naive approach
    check (functions, 5, 15);

    // We run the performance test
    performance (functions, nmin, nmax);
}

这是性能图表:

可以观察到,简单的递归实现相当不错,即使与 AVX2 版本相比也是如此。迭代实现有点令人失望,但我没有花太多精力去优化它。

最后,对于我自己使用 32 位计数器和 n 值高达 28 的用例,与 O(4^n) 中的初始“幼稚”方法相比,这些实现对我来说显然是可以的。


更新

根据@PeterCordes 和@chtz 的一些评论,我添加了以下实现:

  • transfo-avx2-pcordes :与 transfo-avx2 相同,但有一些 AVX2 优化
  • transfo-recur-template :与 transfo-avx2-pcordes 相同,但使用 C++ 模板特化来实现递归
  • transfo-recur-thread :对transfo-recur-template 的两个初始递归调用使用多线程

这是更新后的基准测试结果:

关于这个结果的几点说明:

  1. AVX2 实现在逻辑上是最好的选择,但可能不是 32 位计数器的最大潜在 x8 加速
  2. 在 AVX2 实现中,模板专业化带来了一点加速,但对于较大的 n 值,它几乎消失了
  3. 简单的双线程版本对于 n=20,总有一点加速,但远未达到潜在的 2 倍。

【讨论】:

  • 很好地使用改组,有点类似于前缀和,但掩码不同。 ROTATE4 + AND 与 k4 可能只是 vextracti128 (这使得 ymm 的高位已经归零),但我认为没有保证零扩展的内在函数。 (_mm256_castsi128_si256 未定义上行通道。如果您在_mm256_extracti128_si256 的结果上使用它,可能任何理智的编译器都会以零扩展结束,但它在技术上并不安全。)我想您可以使用vperm2i128_mm256_permute2x128_si256) 带有一个归零向量。
  • ROTATE2k2 可以通过一个通道内随机播放来完成:_mm256_bsrli_epi128(*source, 8); 在每个通道中进行单独的字节移位,移位为零。哦,ROTATE1k1 也可以随机播放。你在低车道上有4,3,2,1,但你实际上掩盖了4。所以,是的,您可以保存所有这些 AND 指令,并仅在前 2 个使用通道内随机播放,从而在 Ryzen 上显着加快速度。 (除了 vperm2i128 在 Ryzen 上与 vextracti128 相比真的很糟糕:/)agner.org/optimize
  • 好的,godbolt.org/z/F4elYD 是每次添加之间的一条指令,没有额外的存储。 _mm256_srli_epi64 ((s), 32) 然后是 _mm256_bsrli_epi128 ((s), 8),然后是 _mm256_zextsi128_si256(_mm256_extracti128_si256((s),1))_mm256_permute2x128_si256((s),(s),0x81)。 (vperm2i128 可以在 imm8 的半字节中设置高位时将通道归零。)
  • 哦,我错过了你有一个对数刻度。我在想代码中的N 与图中的n 相同,并且认为这些问题的大小非常小。 :P 顺便说一句,重新:一个小检查伤害:它已经伤害了 asm。如果两个条件都不为真,编译器必须使代码不做任何事情就返回,而不是只检查一个 cmp/jcc 或失败。建议:在调用真正的实现之前,制作一个检查N &amp; (N-1) == 0 的包装器,即 N 是 2 的幂。然后在实现中省略检查。你可能会得到加速,尤其是。 gcc -fprofile-use.
  • 我还在玩一个完全可内联的递归变体:godbolt.org/z/FYcZHl(未经测试:也许我混淆了左移和右移)。我也懒得写任何非递归的外循环。
猜你喜欢
  • 1970-01-01
  • 2011-11-27
  • 2020-02-22
  • 2010-12-17
  • 1970-01-01
  • 1970-01-01
  • 2011-05-23
  • 2020-03-21
  • 1970-01-01
相关资源
最近更新 更多