【问题标题】:Atomic update in threaded lambda线程 lambda 中的原子更新
【发布时间】:2020-03-22 07:44:14
【问题描述】:

我认为在线程内以这种方式更新原子值并不好(总和有时看起来不太好)

    std::atomic<double> e(0);

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) {
      double ee = 0;
      for(auto k = begin; k != end; ++k) {
        ee += something[k];
      }
      acc.store( acc.load() + ee );
    };

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) {
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    }
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) {
      i.join();
    }

虽然使用锁保护似乎没问题

    std::atomic<double> e(0);
    std::mutex m;

    auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) {
      double ee = 0;
      for(auto k = begin; k != end; ++k) {
        ee += something[k];
      }
      {
          const std::lock_guard<std::mutex> lock(m);
          acc.store( acc.load() + ee );
      }
    };

    std::vector<std::thread> threads(nbThreads);
    const size_t grainsize = miniBatchSize / nbThreads;

    size_t work_iter = 0;
    for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) {
      *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
      work_iter += grainsize;
    }
    threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));

    for(auto&& i : threads) {
      i.join();
    }

我是对的,我在这里缺少什么?是 std::ref(e) 的问题吗?

【问题讨论】:

    标签: c++ c++11 lambda thread-safety


    【解决方案1】:

    您希望加载和存储都作为原子操作发生。目前您的代码可以:

    acc.store(acc.load() + ee);
    

    现在假设一个线程在load() 执行后立即被中断(我们将加载的值称为acc_old)。另一个线程做它的事情(并因此修改acc),然后第一个线程再次运行。它不会重新加载acc,因为它已经加载了它的值。所以这个线程现在将更新acc 以包含acc_old + ee。并且 bam,错误的结果。

    请改用fetch_addoperator+=。两者都保证整个加法操作的原子行为。即:

    acc += ee; // or
    acc.fetch_add(ee);
    

    编辑:请注意,这些函数仅支持从 C++20 开始的浮点原子。对于整数类型,它们从 C++11 开始受支持。因此,如果您需要浮点,您可能必须坚持使用互斥锁。在这种情况下,我建议将双精度值和互斥锁包装在一个类中,这样就不会意外地以错误的方式使用它。

    【讨论】:

      【解决方案2】:

      问题出在一行:

      acc.store( acc.load() + ee );

      有2个操作加载和存储,在它们之间的间隔内,另一个线程可以改变值。

      不幸的是 atomic 不支持 fetch_add。

      你可以试试这个:

          auto atomic_fetch_add = [](std::atomic<double>* obj, double arg)
          {
              auto expected = obj->load();
              while (!atomic_compare_exchange_weak(obj, &expected, expected + arg))
                  ;
              return expected;
          };
      
          std::atomic<double> e(0);
      
          auto worker = [&] (size_t begin, size_t end, std::atomic<double> & acc) {
            double ee = 0;
            for(auto k = begin; k != end; ++k) {
              ee += something[k];
            }
            // acc.store( acc.load() + ee );
            atomic_fetch_add(&acc, ee);
          };
      
          std::vector<std::thread> threads(nbThreads);
          const size_t grainsize = miniBatchSize / nbThreads;
      
          size_t work_iter = 0;
          for(auto it = std::begin(threads); it != std::end(threads) - 1; ++it) {
            *it = std::thread(worker, work_iter, work_iter + grainsize, std::ref(e));
            work_iter += grainsize;
          }
          threads.back() = std::thread(worker, work_iter, miniBatchSize, std::ref(e));
      
          for(auto&& i : threads) {
            i.join();
          }
      

      虽然不能保证 atomic 不使用互斥锁,但您必须检查您的实现。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-03-13
        • 2016-03-31
        • 1970-01-01
        • 2012-07-16
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-04-07
        相关资源
        最近更新 更多