【问题标题】:Fast byte-wise replace if快速逐字节替换 if
【发布时间】:2016-06-05 08:20:39
【问题描述】:

我有一个函数可以将二进制数据从一个区域复制到另一个区域,但前提是字节与特定值不同。这是一个代码示例:

void copy_if(char* src, char* dest, size_t size, char ignore)
{
  for (size_t i = 0; i < size; ++i)
  {
    if (src[i] != ignore)
      dest[i] = src[i];
  }
}

问题是这对于我目前的需要来说太慢了。有没有办法以更快的方式获得相同的结果?

更新: 根据答案,我尝试了两种新的实现:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        char temps = src[i];
        char tempd = dest[i];
        dest[i] = temps == ignore ? tempd : temps;
    }
}

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    const __m128i vignore = _mm_set1_epi8(ignore);

    size_t i;

    for (i = 0; i + 16 <= size; i += 16)
    {
        __m128i v = _mm_loadu_si128((__m128i *)&src[i]);
        __m128i vmask = _mm_cmpeq_epi8(v, vignore);
        vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
        _mm_maskmoveu_si128(v, vmask, (char *)&dest[i]);
    }
    for (; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }

}

我得到了以下结果:

Naive: 
Duration: 2.04844s
Vectorized: 
Pass: PASS
Duration: 3.18553s
SIMD: 
Pass: PASS
Duration: 0.481888s

我猜我的编译器未能矢量化(最后一个 MSVC),但 SIMD 解决方案已经足够好了,谢谢!

更新(之二) 我设法使用一些编译指示对我的编译(MSVC)进行了矢量化,实际上它实际上比 SIMD 更快,这是最终代码:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{

#pragma loop(hint_parallel(0))
#pragma loop(ivdep)

for (int i = 0; i < size; ++i) // Sadly no parallelization if i is unsigned, but more than 2Go of data is very unlikely
{
    char temps = src[i];
    char tempd = dest[i];
    dest[i] = temps == ignore ? tempd : temps;
}
}

【问题讨论】:

  • dest 中未分配的字节会怎样?
  • 并行化?几个线程各自处理一部分复制?
  • 它们已经被分配了一些默认值。
  • 如果你被绑定到一个特定的架构,例如x86,那么这将非常适合 SIMD 实现(例如 SSE - 请参阅 maskmovdqu/_mm_maskmoveu_si128)。
  • 请使用restrict关键字。如果你不能保证指针没有别名,那么编译器就不会向量化。

标签: c optimization x86 sse simd


【解决方案1】:

gcc 向量化以下代码:

#include <stddef.h>
void copy_if(char* src, char* dest, size_t size, char ignore)
{
  for (size_t i = 0; i < size; ++i)
  {
    char temps = src[i];
    char tempd = dest[i];
    dest[i] = temps == ignore ? tempd : temps;
  }
}

请注意,从- 加载和对dest[i] 的赋值都是无条件的,因此编译器不受禁止在多线程程序中发明存储的限制。

编辑一个不太古老的编译器和处理器,以及神螺栓链接:

x86-64 gcc 11.1 compiles 将以下代码与-O3 -mavx512f -mavx512bw 一起生成一个对齐的循环,一次处理 64 个字节:

.L5:
        vmovdqu8        (%rdi,%rax), %zmm2
        vpcmpb  $4, %zmm0, %zmm2, %k1
        vmovdqu8        %zmm2, (%rsi,%rax){%k1}
        addq    $64, %rax
        cmpq    %rax, %r8
        jne     .L5

此编译器还为gcc -std=gnu11 -O3 -mavx2 执行well,一次处理32 个字节:

.L5:
        vpcmpeqb        (%rdi,%rax), %ymm1, %ymm0
        vmovdqu (%rdi,%rax), %ymm2
        vpblendvb       %ymm0, (%rsi,%rax), %ymm2, %ymm0
        vmovdqu %ymm0, (%rsi,%rax)
        addq    $32, %rax
        cmpq    %rax, %r8
        jne     .L5

In general, modern compilers do well 适用于任何带有矢量单元的处理器架构。

旧编译器(gcc 4.8.4),旧处理器(无 AVX512),旧答案:

对于-march=core-avx2,生成的程序集包含这个矢量化循环,一次处理 32 个字节:

.L9:
    vmovdqu (%rdi,%rcx), %ymm1
    addq    $1, %r10
    vmovdqu (%rsi,%rcx), %ymm2
    vpcmpeqb    %ymm0, %ymm1, %ymm3
    vpblendvb   %ymm3, %ymm2, %ymm1, %ymm1
    vmovdqu %ymm1, (%rsi,%rcx)
    addq    $32, %rcx
    cmpq    %r10, %r8
    ja  .L9

对于通用 x86-64,生成的程序集包含这个矢量化循环,一次处理 16 个字节:

.L9:
    movdqu  (%rdi,%r8), %xmm3
    addq    $1, %r10
    movdqa  %xmm3, %xmm1
    movdqu  (%rsi,%r8), %xmm2
    pcmpeqb %xmm0, %xmm1
    pand    %xmm1, %xmm2
    pandn   %xmm3, %xmm1
    por %xmm2, %xmm1
    movdqu  %xmm1, (%rsi,%r8)
    addq    $16, %r8
    cmpq    %r9, %r10
    jb  .L9

对于 armv7l-neon,clang-3.7 生成以下循环,一次处理 16 个字节:

.LBB0_9:                                @ %vector.body
                                        @ =>This Inner Loop Header: Depth=1
        vld1.8  {d18, d19}, [r5]!
        subs.w  lr, lr, #16
        vceq.i8 q10, q9, q8
        vld1.8  {d22, d23}, [r4]
        vbsl    q10, q11, q9
        vst1.8  {d20, d21}, [r4]!
        bne     .LBB0_9

因此,代码不仅比汇编或内在函数更具可读性,而且还可移植到多种体系结构和编译器。新架构和指令集扩展可以通过重新编译轻松使用。

【讨论】:

  • 使用英特尔编译器 13.1.3.198,我必须标记指针 restrict 才能获得矢量化。否则它会抱怨“向量依赖”,我认为是正确的(考虑dst == src+1)。所以我的建议是在指针中添加restrict 修饰符,以增加跨平台和编译器进行矢量化的机会。
  • 如果指定了“假定无别名”标志(在命令行上显式或作为特定优化级别的一部分隐式),某些编译器可能会在不使用 restrict 的情况下对其进行矢量化。
  • @njuffa:嗯,gcc 只是在运行时检查重叠。它需要大约 8 条简单指令(leaorsetcc)。您可以通过创建指针restrict 来摆脱它们,但何必呢?
  • 这是一个很好的观点,尽管我熟悉这项技术,但我忽略了这一点。
【解决方案2】:

这是一个使用 SSE2 内部函数来利用 maskmovdqu 指令的示例。 SIMD 版本在 Haswell CPU 上的运行速度似乎是原始版本的 2 倍左右(使用 clang 编译的代码):

    #include <stdio.h>
    #include <string.h>
    #include <emmintrin.h>  // SSE2
    #include <sys/time.h>   // gettimeofday

    void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
    {
        for (size_t i = 0; i < size; ++i)
        {
            if (src[i] != ignore)
                dest[i] = src[i];
        }
    }

    void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
    {
        const __m128i vignore = _mm_set1_epi8(ignore);

        size_t i;

        for (i = 0; i + 16 <= size; i += 16)
        {
            __m128i v = _mm_loadu_si128((__m128i *)&src[i]);
            __m128i vmask = _mm_cmpeq_epi8(v, vignore);
            vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
            _mm_maskmoveu_si128 (v, vmask, (char *)&dest[i]);
        }
        for ( ; i < size; ++i)
        {
            if (src[i] != ignore)
                dest[i] = src[i];
        }
    }

    #define TIME_IT(init, copy_if, src, dest, size, ignore) \
    do { \
        const int kLoops = 1000; \
        struct timeval t0, t1; \
        double t_ms = 0.0; \
     \
        for (int i = 0; i < kLoops; ++i) \
        { \
            init; \
            gettimeofday(&t0, NULL); \
            copy_if(src, dest, size, ignore); \
            gettimeofday(&t1, NULL); \
            t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
        } \
        printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
    } while (0)

    int main()
    {
        const size_t N = 10000000;

        uint8_t *src = malloc(N);
        uint8_t *dest_ref = malloc(N);
        uint8_t *dest_init = malloc(N);
        uint8_t *dest_test = malloc(N);

        for (size_t i = 0; i < N; ++i)
        {
            src[i] = (uint8_t)rand();
            dest_init[i] = (uint8_t)rand();
        }

        memcpy(dest_ref, dest_init, N);
        copy_if_ref(src, dest_ref, N, 0x42);

        memcpy(dest_test, dest_init, N);
        copy_if_SSE(src, dest_test, N, 0x42);
        printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

        TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
        TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);

        return 0;
    }

编译测试:

$ gcc -Wall -msse2 -O3 copy_if.c && ./a.out 
copy_if_SSE: PASS
copy_if_ref: 0.416 ns / element
copy_if_SSE: 0.239 ns / element

(注意:此答案的早期版本在计时代码中的杂散因子为 16,因此早期的数字比应有的高 16 倍。)


更新

受@EOF 的解决方案和编译器生成的代码的启发,我尝试了一种使用 SSE4 的不同方法,并获得了更好的结果:

#include <stdio.h>
#include <string.h>
#include <smmintrin.h>  // SSE4
#include <sys/time.h>   // gettimeofday

void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }
}

void copy_if_EOF(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        char temps = src[i];
        char tempd = dest[i];
        dest[i] = temps == ignore ? tempd : temps;
    }
}

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    const __m128i vignore = _mm_set1_epi8(ignore);

    size_t i;

    for (i = 0; i + 16 <= size; i += 16)
    {
        __m128i vsrc = _mm_loadu_si128((__m128i *)&src[i]);
        __m128i vdest = _mm_loadu_si128((__m128i *)&dest[i]);
        __m128i vmask = _mm_cmpeq_epi8(vsrc, vignore);
        vdest = _mm_blendv_epi8(vsrc, vdest, vmask);
        _mm_storeu_si128 ((__m128i *)&dest[i], vdest);
    }
    for ( ; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }
}

#define TIME_IT(init, copy_if, src, dest, size, ignore) \
do { \
    const int kLoops = 1000; \
    struct timeval t0, t1; \
    double t_ms = 0.0; \
 \
    for (int i = 0; i < kLoops; ++i) \
    { \
        init; \
        gettimeofday(&t0, NULL); \
        copy_if(src, dest, size, ignore); \
        gettimeofday(&t1, NULL); \
        t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
    } \
    printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
} while (0)

int main()
{
    const size_t N = 10000000;

    uint8_t *src = malloc(N);
    uint8_t *dest_ref = malloc(N);
    uint8_t *dest_init = malloc(N);
    uint8_t *dest_test = malloc(N);

    for (size_t i = 0; i < N; ++i)
    {
        src[i] = (uint8_t)rand();
        dest_init[i] = (uint8_t)rand();
    }

    memcpy(dest_ref, dest_init, N);
    copy_if_ref(src, dest_ref, N, 0x42);

    memcpy(dest_test, dest_init, N);
    copy_if_EOF(src, dest_test, N, 0x42);
    printf("copy_if_EOF: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

    memcpy(dest_test, dest_init, N);
    copy_if_SSE(src, dest_test, N, 0x42);
    printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_EOF, src, dest_test, N, 0x42);
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);

    return 0;
}

编译测试:

$ gcc -Wall -msse4 -O3 copy_if_2.c && ./a.out 
copy_if_EOF: PASS
copy_if_SSE: PASS
copy_if_ref: 0.419 ns / element
copy_if_EOF: 0.114 ns / element
copy_if_SSE: 0.114 ns / element

结论:虽然从功能的角度来看,_mm_maskmoveu_si128 似乎是解决此问题的好方法,但它似乎不如使用显式加载、屏蔽和存储高效。此外,在这种情况下,编译器生成的代码(参见@EOF 的答案)似乎与显式编码的 SIMD 一样快。

【讨论】:

  • @EOF:我刚刚意识到我的计时代码偏离了 16 倍(我使用了一些旧代码并且之前没有发现它)。我现在更新了上面的原始 SSE2 实现,还添加了一个受编译器为您的解决方案生成的代码启发的 SSE4 版本。现在时间差不多了。我认为对于这种特殊情况,自动矢量化代码将很难被击败。
  • 别忘了提到maskmovdqu 有一个非临时提示,所以当它完成时它会从缓存中驱逐目的地。这是一条 10-uop 指令,在 Haswell 上的吞吐量为每 6 个周期一个。 vmaskmovps/pdvpmaskmovd/q 没有 NT 提示,而且速度更快(尤其是如果您要在 dest 仍在缓存中时再次读取它)。但它们仅以 32b 或 64b 粒度进行掩码。没有AVX2vmaskmovdqu ymm,只有128b版本还有NT提示。
  • SSE:总是提供几乎你想要的指令,但不完全!我发现这是特别的。如果您查看 asm,而不仅仅是内在函数,则为 true。避免额外的mov 指令很难,因为您想要的数据移动通常发生在目的地的原地。并且位/字节移位无缘无故地发生在原地。每个班次都有自己的操作码,他们只是不使用/r 字段来选择不同的目的地。它们都可以像pshufd,并且有一个单独的目标。 ://
  • @PeterCordes:好吧,幸运的是,AVX(2) 最终为 x86 向量指令提供了合理的编码。如果英特尔和 AMD 可以停止制造不支持它的新机器,我将不胜感激。
  • (更正我之前的评论;SIMD 立即移位如 psllw xmm0, 2 do 使用 /r 字段获取额外的操作码位,66 0F 71 /6 ib/4 用于 @ 987654339@。也许我正在查看 psllw xmm, xmm/m128 与另一个 reg 的移位计数,这当然需要两个 ModRM 字段)
【解决方案3】:

如果忽略的频率不是很高,下面的 memcpy 代码可能会有所帮助。

size_t copy_if(char* src, char* dest, size_t size, char ignore)
{
    size_t i=0, count =0 , res= 0;
    while (count < size)
    {
    while (*src != ignore){
        count++;
        if (count > size)
             break;
        src++;
        i++;
        res++;
    }
    count++;
    if (i> 0){
        memcpy(dest,src-i, i);
        dest += i;
    }
    i = 0;
    src++;
  }
return res;
}

【讨论】:

  • 对于 memcpy 的大多数(全部?)实现,您现在将传递数组两次。一次用于搜索忽略的出现,第二次(隐藏)在 memcpy() 中。很可能不会比以前更好;)
【解决方案4】:

以下将是一个改进,尽管编译器可以自己设计。

void copy_if(char* src, char* dest, size_t size, char ignore)
{
  while (size--)
  {
    if (*src != ignore)
      *dest = *src;
    src++; dest++;
  }
}

【讨论】:

  • 您的实现与原始实现不同 - 当您忽略副本中的字节时,您不会在目标数组中留下“洞”。 (while 循环中的 size_t 显然是一个错字)。- 我和你一样,大多数现代编译器应该自己解决这个问题。
  • @tofro 对不起,我是古玩。你能告诉我这与原来的问题有什么不同吗?对我来说,他们看起来都在做同样的事情。
  • 原始增加了 dest 指针,即使字符没有被复制,而是被忽略 - 因此在目标数组中创建了“洞”。在这种情况下,Paul 的版本不会增加它,因此使用任何跳过的副本来移动数组内容。我现在看到它显然已经被编辑了,所以不再是这种情况了。所以请忽略我的评论。
  • @tofro 和其他人,我看到并调整了答案。 Tofro,您可能指的是旧答案。很抱歉造成混乱。
猜你喜欢
  • 2013-09-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-04-29
  • 1970-01-01
  • 2013-12-14
  • 2011-03-12
  • 1970-01-01
相关资源
最近更新 更多