【问题标题】:Dependencies with OpenMP on a simple codeOpenMP 对简单代码的依赖
【发布时间】:2023-01-12 10:35:44
【问题描述】:

我已经很长时间没有使用 OpenMP 了,我在优化这段代码时遇到了问题:

#define SIZE 100000000

typedef struct {
  float a,b,c,d,e,f,g,h;
} s_t;
  
void compute(s_t *a) {
  int i;
  for (i=4; i<SIZE; i++) {
    a[i].a=(a[i-1].b * 0.42 + a[i-3].d * .32);
    a[i].b = a[i].c * 3.56 - a[i-3].f;
    a[i].c = a[i].d + a[i].g*a[i-3].d;
    a[i].d = .3334/sqrt(a[i].f*a[i].f + a[i].c*a[i].c);
    if (a[i].f + a[i].a>1e-3) 
      a[i].f = (a[i].f - a[i].a)*(a[i].f + a[i].a);

  }
}

int main() {
  int i;
  s_t *a;
  a=(s_t *)malloc(sizeof(s_t)*SIZE);
  /* Initialization */
  for (i=0; i<SIZE; i++) 
    a[i].a=a[i].b=a[i].c=a[i].d=a[i].e=a[i].f=a[i].g=a[i].h=1./(i+1);
  /* Computation */
  for(i=0;i<100;i++) {
    compute(a);
    fprintf(stderr,".");
  }
  fprintf(stderr,"%f ",a[10].a);
  free(a);
  return 0;
}

我想在计算函数的循环中使用“#pragma omp parallel for”,但有几个依赖项。

我尝试使用 depend 子句,但我认为让 a[i] 依赖于 a[i-1] 和 a[i-3] 只会使代码顺序化。我真的不知道如何用 OpenMP 处理这个问题。你能给我一些想法和/或指导如何去做吗?

我添加了 main 以便你们可以看到计算函数是如何调用的。如果您对如何使用 OpenMP 或任何其他方法优化代码有任何其他想法,请告诉我。

【问题讨论】:

  • SIZE有多大?
  • 这是一个循环,因此这样的代码不能简单地并行化。您可以尝试递归加倍。也许如果你退后一步并描述你实际想要做什么?可能有一种完全不同的方式来表达这一点。
  • 您可能希望使用 sqrtffloat 常量(例如 0.42f)。
  • 请注意,1/sqrt(x) 可以以较低的精度计算得更快。也就是说,100_000_000 次操作的长链肯定会导致相当大的数值错误。由于这段代码是本质上是顺序的,您需要专注于关键路径以使其更快。更具体地说,您当然需要减少关键路径上指令的延迟。
  • 另一个观察。您的代码看起来像是重复出现的,但如果您查看单独的组件则不是。例如,正文的第一行根据先前的i值计算a[i].a组件,但该.a组件未在循环的其他地方使用,因此您可以创建一个单独的完全并行循环来计算@987654329 @价值观。 (还有那个 if 语句的问题。我认为它也可以移到一个单独的循环中。)但是你需要仔细解决这个问题。这并不简单。

标签: c optimization parallel-processing openmp


【解决方案1】:

由于依赖关系,这段代码的优化非常困难。有没有真正有效的方法来使用多线程并行化此代码在现代主流处理器上。也就是说,一些操作是独立的,并且在多个数据上是相同的。因此,我们可以受益于指令级并行SIMD指令.实际上,现代处理器内核可以在给定周期内同时执行多条指令,并且该指令可以同时对多个值进行操作。

此外,我们还可以从快速平方根倒数主流提供x86-64 处理器.事实上,这就是使 -ffast-math 标志更快以及使用 FMA 指令以减少关键路径指令延迟的原因。也就是说,这条指令(以及使用 -ffast-math 导致结果可能因一台机器而异,也可能不太精确.最重要的是,双精度常量可以按照@Laci 的建议替换为浮点常量(更不用说 sqrtf 的使用了,尽管 x86-64 处理器上没有双精度倒数平方根指令) ).


执行

为了能够使用 SIMD 指令,需要跟踪依赖关系并在 C 代码中找到多个处理不同值的相同指令。第一步是展开循环 3 次由于 i-3 依赖。然后,第二步包括在处理依赖关系的同时收集相同的操作。结果像这样丑陋的大代码:

void compute_unroll(s_t* a)
{
    int i;

    for (i=4; i<SIZE-2; i+=3)
    {
        float dm3 = a[i-3].d;
        float dm2 = a[i-2].d;
        float dm1 = a[i-1].d;

        float fm3 = a[i-3].f;
        float fm2 = a[i-2].f;
        float fm1 = a[i-1].f;

        float bm1 = a[i-1].b;

        float a0 = a[i].a;
        float b0 = a[i].b;
        float c0 = a[i].c;
        float d0 = a[i].d;
        float e0 = a[i].e;
        float f0 = a[i].f;
        float g0 = a[i].g;

        float a1 = a[i+1].a;
        float b1 = a[i+1].b;
        float c1 = a[i+1].c;
        float d1 = a[i+1].d;
        float e1 = a[i+1].e;
        float f1 = a[i+1].f;
        float g1 = a[i+1].g;

        float a2 = a[i+2].a;
        float b2 = a[i+2].b;
        float c2 = a[i+2].c;
        float d2 = a[i+2].d;
        float e2 = a[i+2].e;
        float f2 = a[i+2].f;
        float g2 = a[i+2].g;

        b0 = c0 * 3.56f - fm3;
        b1 = c1 * 3.56f - fm2;
        b2 = c2 * 3.56f - fm1;

        a0 = bm1 * 0.42f + dm3 * 0.32f;
        a1 = b0 * 0.42f + dm2 * 0.32f;
        a2 = b1 * 0.42f + dm1 * 0.32f;

        c0 = d0 + g0*dm3;
        c1 = d1 + g1*dm2;
        c2 = d2 + g2*dm1;

        d0 = 0.3334f / sqrtf(f0*f0 + c0*c0);
        d1 = 0.3334f / sqrtf(f1*f1 + c1*c1);
        d2 = 0.3334f / sqrtf(f2*f2 + c2*c2);

        if (f0 + a0 > 1e-3f) f0 = (f0 - a0) * (f0 + a0);
        if (f1 + a1 > 1e-3f) f1 = (f1 - a1) * (f1 + a1);
        if (f2 + a2 > 1e-3f) f2 = (f2 - a2) * (f2 + a2);

        a[i].a = a0;
        a[i].b = b0;
        a[i].c = c0;
        a[i].d = d0;
        a[i].f = f0;

        a[i+1].a = a1;
        a[i+1].b = b1;
        a[i+1].c = c1;
        a[i+1].d = d1;
        a[i+1].f = f1;

        a[i+2].a = a2;
        a[i+2].b = b2;
        a[i+2].c = c2;
        a[i+2].d = d2;
        a[i+2].f = f2;
    }

    for (; i<SIZE; ++i)
    {
        a[i].a = (a[i-1].b * 0.42f + a[i-3].d * 0.32f);
        a[i].b = a[i].c * 3.56f - a[i-3].f;
        a[i].c = a[i].d + a[i].g*a[i-3].d;
        a[i].d = 0.3334f / sqrtf(a[i].f*a[i].f + a[i].c*a[i].c);

        if (a[i].f + a[i].a > 1e-3f)
            a[i].f = (a[i].f - a[i].a) * (a[i].f + a[i].a);
    }
}

现在,我们可以看到某些部分可以很容易地从使用 SIMD 指令中获益,例如:

d0 = 0.3334f / sqrtf(f0*f0 + c0*c0);
d1 = 0.3334f / sqrtf(f1*f1 + c1*c1);
d2 = 0.3334f / sqrtf(f2*f2 + c2*c2);

其他一些部分需要来自内存中多个位置的数据。从非连续位置收集数据通常效率不高。问题是输入布局效率不高首先。通过将数据存储为 a0 a1 a2 b0 b1 b2 c0 c1 c2 ... 而不是 a0 b0 c0 d0 e0 f0 g0 a1 b1 c1 ... 来存储数据会更有效。事实上,这种替代布局使处理器能够逐块加载/存储数据(SIMD 加载/存储),而不是一次加载 1 个值(标量加载)。也就是说,可能无法更改有关使用此代码的上下文的数据布局。因此,此答案的以下部分将不考虑此优化。

现在我们可以使用 SIMD 指令对代码进行矢量化。有很多方法可以做到这一点。例如,有一些相对高级的库可以做到这一点。 OpenMP 也有助于做到这一点。然而,这些解决方案的性能对于像这种病态的异常情况来说往往是相当令人失望的。因此,我选择使用低级 x86-64 SSE 内部函数。更具体地说,我考虑了 SSE4.1 指令集(>99% 的 PC 支持)和 FMA 指令集(也得到广泛支持,并且 fmadd/fmsub 指令可以轻松替换为 fmul+fadd /fsub 说明,如果需要的话)。

void compute_sse(s_t* a)
{
    int i = 4;

    if (i<SIZE-2)
    {
        __m128 vdm = _mm_setr_ps(a[i-3].d, a[i-2].d, a[i-1].d, 0.0f);
        __m128 vfm = _mm_setr_ps(a[i-3].f, a[i-2].f, a[i-1].f, 0.0f);
        float bm1 = a[i-1].b;

        for (; i<SIZE-2; i+=3)
        {
            float a0 = a[i].a, a1 = a[i+1].a, a2 = a[i+2].a;
            float b0 = a[i].b, b1 = a[i+1].b, b2 = a[i+2].b;
            float c0 = a[i].c, c1 = a[i+1].c, c2 = a[i+2].c;
            float d0 = a[i].d, d1 = a[i+1].d, d2 = a[i+2].d;
            float e0 = a[i].e, e1 = a[i+1].e, e2 = a[i+2].e;
            float f0 = a[i].f, f1 = a[i+1].f, f2 = a[i+2].f;
            float g0 = a[i].g, g1 = a[i+1].g, g2 = a[i+2].g;

            // vb[j] = vc[j] * 3.56f - vfm[j]
            __m128 vc = _mm_setr_ps(c0, c1, c2, 0.0f);
            __m128 vb = _mm_fmsub_ps(vc, _mm_set1_ps(3.56f), vfm);

            _MM_EXTRACT_FLOAT(b0, vb, 0);
            _MM_EXTRACT_FLOAT(b1, vb, 1);
            _MM_EXTRACT_FLOAT(b2, vb, 2);

            a[i].b = b0;
            a[i+1].b = b1;
            a[i+2].b = b2;

            // va[j] = vb_prev[j] * 0.42f + vdm[j] * 0.32f
            __m128 vb_prev = _mm_setr_ps(bm1, b0, b1, 0.0f);
            __m128 va = _mm_fmadd_ps(vb_prev, _mm_set1_ps(0.42f), _mm_mul_ps(vdm, _mm_set1_ps(0.32f)));

            // Store data to memory
            _MM_EXTRACT_FLOAT(a[i].a, va, 0);
            _MM_EXTRACT_FLOAT(a[i+1].a, va, 1);
            _MM_EXTRACT_FLOAT(a[i+2].a, va, 2);

            // vc[j] = vg[j] * vdm[j] + vd[j]
            __m128 vd = _mm_setr_ps(d0, d1, d2, 0.0f);
            __m128 vg = _mm_setr_ps(g0, g1, g2, 0.0f);
            vc = _mm_fmadd_ps(vg, vdm, vd);

            // Store data to memory
            _MM_EXTRACT_FLOAT(a[i].c, vc, 0);
            _MM_EXTRACT_FLOAT(a[i+1].c, vc, 1);
            _MM_EXTRACT_FLOAT(a[i+2].c, vc, 2);

            // d[j] = 0.3334f / sqrtf(vf[j]*vf[j] + vc[j]*vc[j])
            __m128 vf = _mm_setr_ps(f0, f1, f2, 0.0f);
            __m128 vf2 = _mm_mul_ps(vf, vf);
            __m128 vc2 = _mm_mul_ps(vc, vc);
            __m128 vsum = _mm_add_ps(vf2, vc2);
            vd = _mm_mul_ps(_mm_set1_ps(0.3334f), _mm_rsqrt_ps(vsum));

            // Store data to memory
            _MM_EXTRACT_FLOAT(a[i].d, vd, 0);
            _MM_EXTRACT_FLOAT(a[i+1].d, vd, 1);
            _MM_EXTRACT_FLOAT(a[i+2].d, vd, 2);

            // if(f[j] + a[j] > 1e-3f) f[j] = (f[j] - a[j]) * (f[j] + a[j]);
            __m128 vfpa = _mm_add_ps(vf, va);
            __m128 vcond = _mm_cmpgt_ps(vfpa, _mm_set1_ps(1e-3f));
            __m128 vfma = _mm_sub_ps(vf, va);
            vf = _mm_blendv_ps(vf, _mm_mul_ps(vfma, vfpa), vcond);

            // Store data to memory
            _MM_EXTRACT_FLOAT(a[i].f, vf, 0);
            _MM_EXTRACT_FLOAT(a[i+1].f, vf, 1);
            _MM_EXTRACT_FLOAT(a[i+2].f, vf, 2);

            // Useful for the next iteration not to reload values from memory
            vdm = vd;
            vfm = vf;
            bm1 = b2;
        }
    }

    // Remaining part
    for (; i<SIZE; ++i)
    {
        a[i].a = (a[i-1].b * 0.42f + a[i-3].d * 0.32f);
        a[i].b = a[i].c * 3.56f - a[i-3].f;
        a[i].c = a[i].d + a[i].g*a[i-3].d;
        a[i].d = 0.3334f / sqrtf(a[i].f*a[i].f + a[i].c*a[i].c);

        if (a[i].f + a[i].a > 1e-3f)
            a[i].f = (a[i].f - a[i].a) * (a[i].f + a[i].a);
    }
}

请注意,此代码尝试将数据尽可能多地保存在寄存器中(快)而不是从内存中重新加载它们(慢)。尽管如此,在我的机器上,这段代码仍然花费了很大一部分时间从内存读取数据/向内存写入数据。这主要是由于内存布局效率低下,但也因为一个核心很难完全饱和内存。


结果

以下是在我的 i5-9600KF 处理器上使用带有标志 -O3 -march=native -ffast-math 的 GCC 10.3 的实验结果:

compute (no -ffast-math):  0.444 ms/it
compute:                   0.404 ms/it
compute_laci:              0.318 ms/it
compute_unroll:            0.317 ms/it
compute_sse:               0.254 ms/it
seq optimal lower-bound:   0.190 ms/it  (saturation of the RAM by 1 core)
par optimal lower-bound:   0.170 ms/it  (saturation of the RAM)

compute_sse主要受记忆限制在我的机器上,它达到了 23.5 GiB/s 的良好吞吐量,而最大值通常是 31-32 GiB/s(用于读/写)使用 1 个内核,并且永远不会超过 34-36 GiB/s(用于读/写)在实践中使用多个内核。更好的内存布局应该足以使执行时间非常接近使用 1 个内核的最佳执行时间。

请注意,由于架构的设计方式(它们旨在运行可扩展的并行代码),像英特尔至强这样的服务器处理器显然不会使 RAM 带宽饱和。事实上,一个核心可以达到的实际 RAM 吞吐量通常明显小于主流 PC 上的吞吐量。因此,这在此类服务器处理器上可能效率较低。有关详细信息,请参阅this answer

【讨论】:

    猜你喜欢
    • 2017-12-03
    • 2015-04-22
    • 2021-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多