【问题标题】:Missed optimization opportunity or required behavior due to acquire-release memory ordering?由于获取-释放内存排序而错过优化机会或所需行为?
【发布时间】:2017-07-30 19:15:12
【问题描述】:

我目前正在尝试提高自定义“伪”堆栈的性能,它的使用方式如下(完整代码在本文末尾提供):

void test() {
  theStack.stackFrames[1] = StackFrame{ "someFunction", 30 };      // A
  theStack.stackTop.store(1, std::memory_order_seq_cst);           // B
  someFunction();                                                  // C
  theStack.stackTop.store(0, std::memory_order_seq_cst);           // D

  theStack.stackFrames[1] = StackFrame{ "someOtherFunction", 35 }; // E
  theStack.stackTop.store(1, std::memory_order_seq_cst);           // F
  someOtherFunction();                                             // G
  theStack.stackTop.store(0, std::memory_order_seq_cst);           // H
}

采样器线程定期挂起目标线程并读取stackTopstackFrames 数组。

我最大的性能问题是stackTop 的顺序一致的存储,所以我试图找出是否可以将它们更改为发布存储。

中心要求是:当采样器线程挂起目标线程并读取stackTop == 1时,那么stackFrames[1]中的信息需要完全存在且一致。这意味着:

  1. 当观察到 B 时,也必须观察到 A。 (“在将堆栈帧放置到位之前,不要增加 stackTop。”)
  2. 观察到 E 时,也必须观察到 D。 (“当放置下一帧的信息时,前一个堆栈帧必须已经退出。”)

我的理解是,对stackTop 使用释放-获取内存排序可以保证第一个要求,但不能保证第二个。更具体地说:

  • 程序顺序中stackTop release-store 之前的任何写入都不能重新排序以发生在它之后。

但是,对于在发布存储之后按程序顺序写入stackTop,没有任何声明。因此,我的理解是可以在观察到 D 之前观察到 E。这是正确的吗?

但如果是这样的话,那么编译器就不能像这样重新排序我的程序:

void test() {
  theStack.stackFrames[1] = StackFrame{ "someFunction", 30 };      // A
  theStack.stackTop.store(1, std::memory_order_release);           // B
  someFunction();                                                  // C

  // switched D and E:
  theStack.stackFrames[1] = StackFrame{ "someOtherFunction", 35 }; // E
  theStack.stackTop.store(0, std::memory_order_release);           // D

  theStack.stackTop.store(1, std::memory_order_release);           // F
  someOtherFunction();                                             // G
  theStack.stackTop.store(0, std::memory_order_release);           // H
}

...然后结合 D 和 F,优化掉零存储?

因为如果我在 macOS 上使用系统 clang 编译上述程序,这不是我所看到的:

$ clang++ -c main.cpp -std=c++11 -O3 && objdump -d main.o

main.o: file format Mach-O 64-bit x86-64

Disassembly of section __TEXT,__text:
__Z4testv:
       0:   55  pushq   %rbp
       1:   48 89 e5    movq    %rsp, %rbp
       4:   48 8d 05 5d 00 00 00    leaq    93(%rip), %rax
       b:   48 89 05 10 00 00 00    movq    %rax, 16(%rip)
      12:   c7 05 14 00 00 00 1e 00 00 00   movl    $30, 20(%rip)
      1c:   c7 05 1c 00 00 00 01 00 00 00   movl    $1, 28(%rip)
      26:   e8 00 00 00 00  callq   0 <__Z4testv+0x2B>
      2b:   c7 05 1c 00 00 00 00 00 00 00   movl    $0, 28(%rip)
      35:   48 8d 05 39 00 00 00    leaq    57(%rip), %rax
      3c:   48 89 05 10 00 00 00    movq    %rax, 16(%rip)
      43:   c7 05 14 00 00 00 23 00 00 00   movl    $35, 20(%rip)
      4d:   c7 05 1c 00 00 00 01 00 00 00   movl    $1, 28(%rip)
      57:   e8 00 00 00 00  callq   0 <__Z4testv+0x5C>
      5c:   c7 05 1c 00 00 00 00 00 00 00   movl    $0, 28(%rip)
      66:   5d  popq    %rbp
      67:   c3  retq

具体来说,2b 处的 movl $0, 28(%rip) 指令仍然存在。

巧合的是,这个输出正是我所需要的。但我不知道我是否可以依赖它,因为据我了解,我选择的内存顺序并不能保证它。

所以我的主要问题是:获取-释放内存顺序是否给了我另一个(幸运的)我不知道的保证?还是编译器只是偶然地做了我需要的事情/因为它没有尽可能地优化这个特殊情况?

完整代码如下:

// clang++ -c main.cpp -std=c++11 -O3 && objdump -d main.o

#include <atomic>
#include <cstdint>

struct StackFrame
{
  const char* functionName;
  uint32_t lineNumber;
};

struct Stack
{
  Stack()
    : stackFrames{ StackFrame{ nullptr, 0 }, StackFrame{ nullptr, 0 } }
    , stackTop{0}
  {
  }

  StackFrame stackFrames[2];
  std::atomic<uint32_t> stackTop;
};

Stack theStack;

void someFunction();
void someOtherFunction();

void test() {
  theStack.stackFrames[1] = StackFrame{ "someFunction", 30 };
  theStack.stackTop.store(1, std::memory_order_release);
  someFunction();
  theStack.stackTop.store(0, std::memory_order_release);

  theStack.stackFrames[1] = StackFrame{ "someOtherFunction", 35 };
  theStack.stackTop.store(1, std::memory_order_release);
  someOtherFunction();
  theStack.stackTop.store(0, std::memory_order_release);
}

/**
 * // Sampler thread:
 *
 * #include <chrono>
 * #include <iostream>
 * #include <thread>
 *
 * void suspendTargetThread();
 * void unsuspendTargetThread();
 * 
 * void samplerThread() {
 *   for (;;) {
 *     // Suspend the target thread. This uses a platform-specific
 *     // mechanism:
 *     //  - SuspendThread on Windows
 *     //  - thread_suspend on macOS
 *     //  - send a signal + grab a lock in the signal handler on Linux
 *     suspendTargetThread();
 * 
 *     // Now that the thread is paused, read the leaf stack frame.
 *     uint32_t stackTop =
 *       theStack.stackTop.load(std::memory_order_acquire);
 *     StackFrame& f = theStack.stackFrames[stackTop];
 *     std::cout << f.functionName << " at line "
 *               << f.lineNumber << std::endl;
 * 
 *     unsuspendTargetThread();
 * 
 *     std::this_thread::sleep_for(std::chrono::milliseconds(1));
 *   }
 * }
 */

而且,为了满足好奇心,如果我使用顺序一致的存储,这就是程序集:

$ clang++ -c main.cpp -std=c++11 -O3 && objdump -d main.o

main.o: file format Mach-O 64-bit x86-64

Disassembly of section __TEXT,__text:
__Z4testv:
       0:   55  pushq   %rbp
       1:   48 89 e5    movq    %rsp, %rbp
       4:   41 56   pushq   %r14
       6:   53  pushq   %rbx
       7:   48 8d 05 60 00 00 00    leaq    96(%rip), %rax
       e:   48 89 05 10 00 00 00    movq    %rax, 16(%rip)
      15:   c7 05 14 00 00 00 1e 00 00 00   movl    $30, 20(%rip)
      1f:   41 be 01 00 00 00   movl    $1, %r14d
      25:   b8 01 00 00 00  movl    $1, %eax
      2a:   87 05 20 00 00 00   xchgl   %eax, 32(%rip)
      30:   e8 00 00 00 00  callq   0 <__Z4testv+0x35>
      35:   31 db   xorl    %ebx, %ebx
      37:   31 c0   xorl    %eax, %eax
      39:   87 05 20 00 00 00   xchgl   %eax, 32(%rip)
      3f:   48 8d 05 35 00 00 00    leaq    53(%rip), %rax
      46:   48 89 05 10 00 00 00    movq    %rax, 16(%rip)
      4d:   c7 05 14 00 00 00 23 00 00 00   movl    $35, 20(%rip)
      57:   44 87 35 20 00 00 00    xchgl   %r14d, 32(%rip)
      5e:   e8 00 00 00 00  callq   0 <__Z4testv+0x63>
      63:   87 1d 20 00 00 00   xchgl   %ebx, 32(%rip)
      69:   5b  popq    %rbx
      6a:   41 5e   popq    %r14
      6c:   5d  popq    %rbp
      6d:   c3  retq

仪器将xchgl 指令识别为最昂贵的部分。

【问题讨论】:

  • 我计划通过制作functionNamelineNumber 原子来获得第二个保证,并为它们使用发布存储。这似乎根本不会影响生成的代码,所以我认为它会没事的。我只是想知道它是否真的有必要。
  • 在谈论二叉树时不是通常使用术语 leaf 吗?这里应该是什么意思?
  • 我将其重命名为“stackTop”以使其更清晰。
  • 您对memory_order 的含义有很大的误解。我会尝试给你一个答案,但由于我在这里没有太多经验,我会让其他人回答。你基本上需要一个互斥锁。

标签: c++ multithreading llvm atomic memory-barriers


【解决方案1】:

你可以这样写:

void test() {
  theStack.stackFrames[1] = StackFrame{ "someFunction", 30 };      // A
  theStack.stackTop.store(1, std::memory_order_release);           // B
  someFunction();                                                  // C
  theStack.stackTop.exchange(0, std::memory_order_acq_rel);        // D

  theStack.stackFrames[1] = StackFrame{ "someOtherFunction", 35 }; // E
  theStack.stackTop.store(1, std::memory_order_release);           // F
  someOtherFunction();                                             // G
  theStack.stackTop.exchange(0, std::memory_order_acq_rel);        // H
}

这应该提供您正在寻找的第二个保证,即在 D 之前可能不会观察到 E。否则我认为编译器将有权按照您的建议重新排序指令。

由于采样器线程“获取”stackTop 并在读取之前挂起目标线程,这应该提供额外的同步,所以当 stackTop 为 1 时它应该始终看到有效数据。

如果您的采样器没有挂起目标线程,或者如果挂起没有等待线程实际挂起(检查这个),我认为有必要使用互斥锁或等效物来防止采样器在读取后读取陈旧数据堆栈顶部作为一个(例如,如果它在错误的时刻被调度程序挂起)。

如果您可以依靠挂起来提供同步并且只需要通过编译器来约束重新排序,您应该看看std::atomic_signal_fence

【讨论】:

  • 使零存储也成为获取操作是个好主意,谢谢!至于线程挂起:Windows 和 Mac API 等待线程挂起,而在 Linux 实现中,我们在信号处理程序中锁定了一个互斥体,所以我认为我们都很好。为了完整起见,我应该提一下,我不相信将memory_order_acq_rel 传递给store 是有意义的;我想我可能需要打电话给exchange。 (我正在修改的代码使用的 API 甚至不允许我将 memory_order_acq_rel 传递给 store,因此调用 exchange 确实是我唯一的选择。)
  • 很高兴我的错误答案给了你一个有用的想法:-),编辑使用交换。但这是另一个迹象,表明这种内存排序的使用是不寻常的。
  • 啊,事实证明 x86 上的 exchange(0, std::memory_order_acq_rel) 编译为与 store(0, std::memory_order_seq_cst) 相同(慢)的 xchgl 指令。所以我还没有在这里完成。我猜我可以在存储之后执行获取加载并忽略加载的值?不确定这是否有帮助。
  • 既然你要停止线程,我提到的信号围栏应该足够了。这明确不会生成特殊指令,但禁止编译器重新排序。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多