【问题标题】:Lock-free SPSC queue implementation on ARMARM 上的无锁 SPSC 队列实现
【发布时间】:2020-05-29 00:08:50
【问题描述】:

我正在尝试为 ARM 编写一个单一的生产者单一消费者队列,我想我已经接近了 DMB,但需要一些检查(我更熟悉 std::atomic。)

这是我所在的位置:

bool push(const_reference value)
{
    // Check for room
    const size_type currentTail = tail;
    const size_type nextTail = increment(currentTail);
    if (nextTail == head)
        return false;

    // Write the value
    valueArr[currentTail] = value;

    // Prevent the consumer from seeing the incremented tail before the
    // value is written.
    __DMB();

    // Increment tail
    tail = nextTail;

    return true;
}

bool pop(reference valueLocation)
{
    // Check for data
    const size_type currentHead = head;
    if (currentHead == tail)
        return false;

    // Write the value.
    valueLocation = valueArr[currentHead];

    // Prevent the producer from seeing the incremented head before the
    // value is written.
    __DMB();

    // Increment the head
    head = increment(head);

    return true;
}

我的问题是:我的 DMB 位置和理由是否准确?还是仍然理解我失踪了?在处理由其他线程(或中断)更新的变量时,我特别不确定条件是否需要一些保护。

【问题讨论】:

  • 是的,屏障然后存储 = tail 的释放存储,然后加载屏障 = 获取负载。但是您正在使用加载的值 before 您对其设置屏障,以便根据来自另一个线程的值进行比较和分支。在使用currentHead / Tail 之前,最好将DMB 放在分支之后,因为您没有mo_consume 样式依赖排序的数据依赖。我不确定这是否有问题。
  • 通常最好让 std::atomic 为您发出 dmb,而不是使用编译时 + 运行时内存屏障,但对于 ARM,您最终可能会浪费屏障;如果您只需要获取订单,您可能只需要一个总数吗?不,您可能需要在tail 上获取和在head 上发布。请注意 head = increment(head) 可以使用 increment(currentHead) ;无需重新加载共享变量,因为该线程是唯一修改它的线程(SPSC)。
  • 我是这么认为的。可能的故障模式是push 直到存储到valueArr[currentTail] 之后才检查nextTail == head(当乱序推测执行开始加载tail 并验证预测路径时)。所以它可能会踩到pop 没有读完的值。如果您在 asm 中编写了整个函数,如果您使用了基于比较条件而不是分支的谓词加载或存储,那么您可能可以通过 consume 样式的排序来避免这种情况。但是在 C 中,你无法控制它是数据依赖还是控制依赖。
  • 当然,如果这是单核设备,那么您不需要任何运行时障碍,只需要编译器障碍(atomic_signal_fence()asm("" ::: "memory");)。上下文切换和中断将在同一个内核上运行,从而保留所有指令按程序顺序运行的错觉。
  • 编译器屏障编译为零 asm 指令。您不需要 ARM 指令(如果这就是您所说的“命令”),只需一个 C 函数/内置函数,如 GNU C asm("" ::: "memory")。我建议将它隐藏在像 barrier() 这样的宏后面,这样如果你将来需要,你可以重新定义它以包含运行时屏障,就像 Linux 内核一样:非 SMP 构建可以定义屏障宏来阻止编译 -时间重新排序。

标签: c++ arm synchronization lock-free memory-barriers


【解决方案1】:
  • 屏障是必要的,但还不够,您还需要“获取”语义来加载由其他线程修改的 var。 (或者至少consume,但是没有障碍需要 asm 来创建数据依赖项。编译器在已经拥有控制依赖项后不会这样做。)
  • 单核系统只能使用编译器屏障,例如 GNU C asm("":::"memory")std::atomic_signal_fence(std::memory_order_release),而不是 dmb。制作一个宏,以便您可以在 SMP 安全屏障或 UP(单处理器)屏障之间进行选择。
  • head = increment(head);head 的无意义重载,请使用本地副本。
  • 使用std::atomic 可移植地获取必要的代码生成。

您通常不需要滚动自己的原子; ARM 的现代编译器确实实现了std::atomic<T>。但是 AFAIK,没有std::atomic<> 实现意识到单核系统以避免实际障碍并且只是安全的。可能导致上下文切换的中断。

在单核系统上,您不需要dsb,只需要一个编译器屏障。 CPU 将保留 asm 指令按程序顺序执行的错觉。您只需要确保编译器生成以正确顺序执行操作的 asm。您可以通过使用std::atomicstd::memory_order_relaxed 以及手动atomic_signal_fence(memory_order_acquire)release 屏障来做到这一点。 (不是atomic_thread_fence;它会发出asm 指令,通常是dsb)。


每个线程读取另一个线程修改的变量。通过确保它们仅在访问数组后可见,您正确地进行了修改发布存储。

但这些读取也需要acquire-loads 才能与那些发布存储同步。例如。确保pushpop 完成读取相同元素之前没有写入valueArr[currentTail] = value;。或者在完整写入之前阅读条目。

没有任何障碍,故障模式是if (currentHead == tail) return false; 直到之后才真正从内存中检查tail 的值 valueLocation = valueArr[currentHead]; 发生了。运行时负载重新排序可以在弱排序 ARM 上轻松完成。如果加载地址对tail 具有数据依赖关系,则可以避免在SMP 系统上需要屏障(ARM 保证asm 中的依赖关系排序;mo_consume 应该公开的特性)。但是如果编译器只是发出一个分支,那只是一个控制依赖,而不是数据。如果您在 asm 中手动编写,我认为在比较设置的标志上像 ldrne r0, [r1, r2] 这样的谓词加载会创建一个 data 依赖项。

编译时重新排序不太合理,但如果它只是阻止编译器执行它本来不打算执行的操作,那么仅编译器的屏障是免费的。


未经测试的实现,编译为asm,看起来不错,但没有其他测试

push 做类似的事情。我包含了用于加载获取/存储释放和 fullbarrier() 的包装函数。 (相当于 Linux 内核的 smp_mb() 宏,定义为编译时或编译+运行时屏障。)

#include <atomic>

#define UNIPROCESSOR


#ifdef UNIPROCESSOR
#define fullbarrier()  asm("":::"memory")   // GNU C compiler barrier
                          // atomic_signal_fence(std::memory_order_seq_cst)
#else
#define fullbarrier() __DMB()    // or atomic_thread_fence(std::memory_order_seq_cst)
#endif

template <class T>
T load_acquire(std::atomic<T> &x) {
#ifdef UNIPROCESSOR
    T tmp = x.load(std::memory_order_relaxed);
    std::atomic_signal_fence(std::memory_order_acquire);
    // or fullbarrier();  if you want to use that macro
    return tmp;
#else
    return x.load(std::memory_order_acquire);
    // fullbarrier() / __DMB();
#endif
}

template <class T>
void store_release(std::atomic<T> &x, T val) {
#ifdef UNIPROCESSOR
    std::atomic_signal_fence(std::memory_order_release);
    // or fullbarrier();
    x.store(val, std::memory_order_relaxed);
#else
    // fullbarrier() / __DMB(); before plain store
    return x.store(val, std::memory_order_release);
#endif
}

template <class T>
struct SPSC_queue {
  using size_type = unsigned;
  using value_type = T;
  static const size_type size = 1024;

  std::atomic<size_type> head;
  value_type valueArr[size];
  std::atomic<size_type> tail;  // in a separate cache-line from head to reduce contention

  bool push(const value_type &value)
  {
    // Check for room
    const size_type currentTail = tail.load(std::memory_order_relaxed);  // no other writers to tail, no ordering needed
    const size_type nextTail = currentTail + 1;    // modulo separately so empty and full are distinguishable.
    if (nextTail == load_acquire(head))
        return false;

    valueArr[currentTail % size] = value;
    store_release(tail, nextTail);
    return true;
  }
};

// instantiate the template for  int  so we can look at the asm
template bool SPSC_queue<int>::push(const value_type &value);

如果您使用-DUNIPROCESSORg++9.2 -O3 -mcpu=cortex-a15,则以零障碍干净地编译on the Godbolt compiler explorer(只是为了选择一个随机的现代 ARM 内核,以便 GCC 可以内联 std::atomic 加载/存储函数和障碍用于非单处理器案例。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-21
    • 2011-02-26
    • 2010-12-10
    相关资源
    最近更新 更多