【问题标题】:What is the correct way to implement thread barrier, and barrier resetting in C?在 C 中实现线程屏障和屏障重置的正确方法是什么?
【发布时间】:2014-12-01 15:03:36
【问题描述】:

我尝试在我的代码中实现一个简单的屏障,如下所示:

void waitOnBarrier(int* barrier, int numberOfThreads) {
    atomicIncrement(barrier); // atomic increment implemented in assembly
    while(*barrier < numberOfThreads);
}

然后代码中有一个barrier的用法:

int g_barrier = 0; // a global variable

waitOnBarrier(&g_barrier, someKnownNumberOfThreads);

到目前为止一切顺利,但我应该在哪里将我的 g_barrier 变量重置为零?如果我写类似

g_barrier = 0;

waitOnBarrier 调用之后,如果其中一个线程比其他线程更快地从屏障中释放并取消 g_barrier 而所有其他线程,我将遇到问题仍在执行循环指令,因此最终它们将永远卡在屏障上。

说明: waitOnBarrier 会编译成这样的东西(伪代码):

1: mov rax, numberOfThreads
2: mov rbx, [barrier]
3: cmp rax, rbx
4: jmp(if smaller) to 2

因此,如果我们有 2 个线程在屏障上同步,并且 thread_1 在指令 3 或 4 的某处很慢,而更快的 thread_2 到达屏障,通过它并继续 g_barrier 无效流程。这意味着在 thread_1 将到达指令 2 之后,它将在 [barrier] 处看到零值,并将永远卡在屏障上!

问题是,我应该如何取消 g_barrier,它在代码中的哪个位置“足够远”,我可以确定到那时所有线程都离开了屏障?还是有更正确的方法来实现屏障?

【问题讨论】:

  • 您是否有不想使用操作系统提供的线程同步库的原因?
  • 是的,我的代码运行在特定的环境中,而不是传统的操作系统。
  • 实现这种无锁有什么特别的原因吗?
  • 自 C11 起,C 将原子类型作为语言的一部分,编译器开始实现它。如果您的编译器没有,我认为仍然值得使用该标准带来的概念。特别是你应该使用类似于atomic_flag 的东西和测试和设置操作来实现忙等待。不要重新发明轮子,让您的代码面向未来。

标签: c multithreading x86 barrier


【解决方案1】:

尝试实施本书中解释的障碍解决方案:

The Little Book of Semaphores

【讨论】:

    【解决方案2】:

    不要将您的 barrier 变量重置为零。

    当任何线程即将退出时,自动将barrier 变量减一。

    您的障碍看起来您不希望产生的工作线程数低于numberOfThreads

    【讨论】:

    • 问题是试图实现一个屏障,每个线程都等待,直到 所有 线程到达屏障,然后它们都继续。您的原子递减想法对这种障碍没有帮助,因为线程仍然可能会错过唤醒。
    【解决方案3】:

    障碍实际上很难实现,主要原因是新服务员可以在所有老服务员有机会执行之前开始到达,这排除了任何基于计数的简单实现。我首选的解决方案是让屏障对象本身简单地指向一个“当前屏障实例”,该实例存在于到达屏障的第一个线程的堆栈上,并且这也是最后一个离开的线程(因为它不能离开而其他线程线程仍在引用其堆栈)。就 pthread 原语(可以适应 C11 锁定原语或您必须使用的任何东西)而言,一个非常好的示例实现包含在 Michael Burr 对我过去关于该主题的问题的回答中:

    https://stackoverflow.com/a/5902671/379897

    是的,看起来工作量很大,但是编写一个真正满足屏障契约的屏障实现并非易事。

    【讨论】:

      【解决方案4】:

      我在尝试做类似的事情时遇到了这个问题,所以我想我会分享我的解决方案,以防其他人发现它有用。它是在纯 C++11 中实现的(遗憾的是不是 C11,因为标准的多线程部分在 gcc 和 msvc 中尚不支持)。

      基本上,您维护两个计数器,它们的用法是交替的。下面是实现和使用示例:

          #include <cstdio>
          #include <thread>
          #include <condition_variable>
      
          // A barrier class; The barrier is initialized with the number of expected threads to synchronize
          class barrier_t{
              const size_t count;
              size_t counter[2], * currCounter;
              std::mutex mutex;
              std::condition_variable cv;
      
          public:
              barrier_t(size_t count) : count(count), currCounter(&counter[0]) {
                  counter[0] = count;
                  counter[1] = 0;
              }
              void wait(){
                  std::unique_lock<std::mutex> lock(mutex);
                  if (!--*currCounter){
                      currCounter += currCounter == counter ? 1 : -1;
                      *currCounter = count;
                      cv.notify_all();
                  }
                  else {
                      size_t * currCounter_local = currCounter;
                      cv.wait(lock, [currCounter_local]{return *currCounter_local == 0; });
                  }
              }
          };
      
          void testBarrier(size_t iters, size_t threadIdx, barrier_t *B){
              for(size_t i = 0; i < iters; i++){
                  printf("Hello from thread %i for the %ith time!\n", threadIdx, i);
                  B->wait();
              }
          }
      
          int main(void){
              const size_t threadCnt = 4, iters = 8;
              barrier_t B(threadCnt);
              std::thread t[threadCnt];   
              for(size_t i = 0; i < threadCnt; i++) t[i] = std::thread(testBarrier, iters, i, &B);
              for(size_t i = 0; i < threadCnt; i++) t[i].join();
              return 0;
          }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-12-24
        • 2013-08-31
        • 2011-12-28
        • 2011-10-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-10-19
        相关资源
        最近更新 更多