【问题标题】:Can I report progress for openmp tasks?我可以报告 openmp 任务的进度吗?
【发布时间】:2015-03-18 23:53:40
【问题描述】:

想象一个经典的 OMP 任务:

  • 在 [0.0, 1.0) 范围内对一个大的双精度向量求和

Live On Coliru

using namespace std;

int main() {
    vector<double> v;

    // generate some data
    generate_n(back_inserter(v), 1ul << 18, 
       bind(uniform_real_distribution<double>(0,1.0), default_random_engine { random_device {}() }));

    long double sum = 0;

    {
#pragma omp parallel for reduction(+:sum)
        for(size_t i = 0; i < v.size(); i++)
        {
            sum += v[i];
        }
    }
    std::cout << "Done: sum = " << sum << "\n";
}

我无法想出如何报告进度。毕竟,OMP 正在为我处理团队线程之间的所有协调,而我没有全局状态。

我可能会使用常规的std::thread 并从那里观察一些共享变量,但是没有更“omp-ish”的方式来实现这一点吗?

【问题讨论】:

    标签: c++ multithreading parallel-processing openmp progress


    【解决方案1】:

    只需让团队中的每个线程跟踪本地进度并自动更新全局计数器即可。您仍然可以让另一个线程观察它,或者,如下面的示例所示,您可以只在 OMP 关键部分中进行终端输出。

    这里的关键是调整不会导致高度频繁更新的步长,因为这样对关键区域的锁定(以及在较小程度上原子加载/存储)会降低性能。

    Live On Coliru

    #include <omp.h>
    #include <vector>
    #include <random>
    #include <algorithm>
    #include <iterator>
    #include <functional>
    #include <iostream>
    #include <iomanip>
    
    using namespace std;
    
    int main() {
        vector<double> v;
        // generate some data
        generate_n(back_inserter(v), 1ul << 18, bind(uniform_real_distribution<double>(0,1.0), default_random_engine { random_device {}() }));
    
        auto step_size   = 100ul;
        auto total_steps = v.size() / step_size + 1;
    
        size_t steps_completed = 0;
        long double sum = 0;
    
    #pragma omp parallel 
        {
            size_t local_count = 0;
    
    
    #pragma omp for reduction(+:sum)
            for(size_t i = 0; i < v.size(); i++)
            {
                sum += v[i];
    
                if (local_count++ % step_size == step_size-1)
                {
    #pragma omp atomic
                    ++steps_completed;
    
                    if (steps_completed % 100 == 1)
                    {
    #pragma omp critical
                        std::cout << "Progress: " << steps_completed << " of " << total_steps << " (" << std::fixed << std::setprecision(1) << (100.0*steps_completed/total_steps) << "%)\n";
                    }
                }
            }
        }
        std::cout << "Done: sum = " << sum << "\n";
    }
    

    最后,打印结果。输出:

    Progress: 1 of 2622 (0.0%)
    Progress: 191 of 2622 (7.3%)
    Progress: 214 of 2622 (8.2%)
    Progress: 301 of 2622 (11.5%)
    Progress: 401 of 2622 (15.3%)
    Progress: 501 of 2622 (19.1%)
    Progress: 601 of 2622 (22.9%)
    Progress: 701 of 2622 (26.7%)
    Progress: 804 of 2622 (30.7%)
    Progress: 901 of 2622 (34.4%)
    Progress: 1003 of 2622 (38.3%)
    Progress: 1101 of 2622 (42.0%)
    Progress: 1201 of 2622 (45.8%)
    Progress: 1301 of 2622 (49.6%)
    Progress: 1402 of 2622 (53.5%)
    Progress: 1501 of 2622 (57.2%)
    Progress: 1601 of 2622 (61.1%)
    Progress: 1701 of 2622 (64.9%)
    Progress: 1801 of 2622 (68.7%)
    Progress: 1901 of 2622 (72.5%)
    Progress: 2001 of 2622 (76.3%)
    Progress: 2101 of 2622 (80.1%)
    Progress: 2203 of 2622 (84.0%)
    Progress: 2301 of 2622 (87.8%)
    Progress: 2402 of 2622 (91.6%)
    Progress: 2501 of 2622 (95.4%)
    Progress: 2601 of 2622 (99.2%)
    Done: sum = 130943.8
    

    【讨论】:

    • 鉴于您没有使用动态调度(几乎所有 OpenMP 实现中的默认循环调度是 static)并且您的问题是正常的,每个线程将运行或多或少相同的时间.只需跟踪主线程的进度。在那里执行 I/O 应该足够慢以最后完成,因此总执行时间 = 主线程的执行时间,这将使进度指示器准确。您的解决方案更适用于动态案例。
    • @HristoIliev 很好的观察。确实,我希望比过于简单的示例更通用。
    • @Zboson 并非所有访问都在关键部分。此外,关键部分实际上仅用于直接报告 - 正如我所说的,这只是为了示例的简单性。你可以例如使步骤比报告更新更细化,并且您只会将后者放在 critsec 中
    • @Zboson 我已经更新了示例来展示这个想法。这可能与许多步骤有关。当工作被动态调度时,使粒度更细将导致更规律的输出,但您不希望锁定在该频率,因为这会损害吞吐量
    • 您使用local_count++ % step_size == step_size-1 之类的表达式而不是更常见的local_count++ % step_size == 0 的原因是什么?
    【解决方案2】:

    在没有本机原子支持(甚至使用它们)的处理器上使用 #pragma omp atomic,正如这里的其他答案所建议的那样,可能会减慢您的程序。

    进度指示器的想法是让用户想法知道什么时候会完成。如果您的目标是加/减总运行时间的一小部分,则用户不会太烦恼。也就是说,用户宁愿以更准确地知道事情何时完成为代价,让事情早点完成。

    因此,我通常只跟踪单个线程的进度并使用它来估计总进度。这适用于每个线程具有相似工作负载的情况。由于您使用的是#pragma omp parallel for,因此您可能会处理一系列类似的元素而没有相互依赖关系,因此我的假设可能对您的用例有效。

    我已经将这个逻辑封装在一个类ProgressBar 中,我通常将它与它的帮助类Timer 一起包含在一个头文件中。该类使用 ANSI 控制信号来保持美观。

    输出如下:

    [======                                            ] (12% - 22.0s - 4 threads)
    

    通过声明-DNOPROGRESS 编译标志,让编译器消除进度条的所有开销也很容易。

    代码和示例用法如下:

    #include <iostream>
    #include <chrono>
    #include <thread>
    #include <iomanip>
    #include <stdexcept>
    
    #ifdef _OPENMP
      ///Multi-threading - yay!
      #include <omp.h>
    #else
      ///Macros used to disguise the fact that we do not have multithreading enabled.
      #define omp_get_thread_num()  0
      #define omp_get_num_threads() 1
    #endif
    
    
    ///@brief Used to time how intervals in code.
    ///
    ///Such as how long it takes a given function to run, or how long I/O has taken.
    class Timer{
     private:
      typedef std::chrono::high_resolution_clock clock;
      typedef std::chrono::duration<double, std::ratio<1> > second;
    
      std::chrono::time_point<clock> start_time; ///< Last time the timer was started
      double accumulated_time;                   ///< Accumulated running time since creation
      bool running;                              ///< True when the timer is running
    
     public:
      Timer(){
        accumulated_time = 0;
        running          = false;
      }
    
      ///Start the timer. Throws an exception if timer was already running.
      void start(){
        if(running)
          throw std::runtime_error("Timer was already started!");
        running=true;
        start_time = clock::now();
      }
    
      ///Stop the timer. Throws an exception if timer was already stopped.
      ///Calling this adds to the timer's accumulated time.
      ///@return The accumulated time in seconds.
      double stop(){
        if(!running)
          throw std::runtime_error("Timer was already stopped!");
    
        accumulated_time += lap();
        running           = false;
    
        return accumulated_time;
      }
    
      ///Returns the timer's accumulated time. Throws an exception if the timer is
      ///running.
      double accumulated(){
        if(running)
          throw std::runtime_error("Timer is still running!");
        return accumulated_time;
      }
    
      ///Returns the time between when the timer was started and the current
      ///moment. Throws an exception if the timer is not running.
      double lap(){
        if(!running)
          throw std::runtime_error("Timer was not started!");
        return std::chrono::duration_cast<second> (clock::now() - start_time).count();
      }
    
      ///Stops the timer and resets its accumulated time. No exceptions are thrown
      ///ever.
      void reset(){
        accumulated_time = 0;
        running          = false;
      }
    };
    
    
    ///@brief Manages a console-based progress bar to keep the user entertained.
    ///
    ///Defining the global `NOPROGRESS` will
    ///disable all progress operations, potentially speeding up a program. The look
    ///of the progress bar is shown in ProgressBar.hpp.
    class ProgressBar{
     private:
      uint32_t total_work;    ///< Total work to be accomplished
      uint32_t next_update;   ///< Next point to update the visible progress bar
      uint32_t call_diff;     ///< Interval between updates in work units
      uint32_t work_done;
      uint16_t old_percent;   ///< Old percentage value (aka: should we update the progress bar) TODO: Maybe that we do not need this
      Timer    timer;         ///< Used for generating ETA
    
      ///Clear current line on console so a new progress bar can be written
      void clearConsoleLine() const {
        std::cerr<<"\r\033[2K"<<std::flush;
      }
    
     public:
      ///@brief Start/reset the progress bar.
      ///@param total_work  The amount of work to be completed, usually specified in cells.
      void start(uint32_t total_work){
        timer = Timer();
        timer.start();
        this->total_work = total_work;
        next_update      = 0;
        call_diff        = total_work/200;
        old_percent      = 0;
        work_done        = 0;
        clearConsoleLine();
      }
    
      ///@brief Update the visible progress bar, but only if enough work has been done.
      ///
      ///Define the global `NOPROGRESS` flag to prevent this from having an
      ///effect. Doing so may speed up the program's execution.
      void update(uint32_t work_done0){
        //Provide simple way of optimizing out progress updates
        #ifdef NOPROGRESS
          return;
        #endif
    
        //Quick return if this isn't the main thread
        if(omp_get_thread_num()!=0)
          return;
    
        //Update the amount of work done
        work_done = work_done0;
    
        //Quick return if insufficient progress has occurred
        if(work_done<next_update)
          return;
    
        //Update the next time at which we'll do the expensive update stuff
        next_update += call_diff;
    
        //Use a uint16_t because using a uint8_t will cause the result to print as a
        //character instead of a number
        uint16_t percent = (uint8_t)(work_done*omp_get_num_threads()*100/total_work);
    
        //Handle overflows
        if(percent>100)
          percent=100;
    
        //In the case that there has been no update (which should never be the case,
        //actually), skip the expensive screen print
        if(percent==old_percent)
          return;
    
        //Update old_percent accordingly
        old_percent=percent;
    
        //Print an update string which looks like this:
        //  [================================================  ] (96% - 1.0s - 4 threads)
        std::cerr<<"\r\033[2K["
                 <<std::string(percent/2, '=')<<std::string(50-percent/2, ' ')
                 <<"] ("
                 <<percent<<"% - "
                 <<std::fixed<<std::setprecision(1)<<timer.lap()/percent*(100-percent)
                 <<"s - "
                 <<omp_get_num_threads()<< " threads)"<<std::flush;
      }
    
      ///Increment by one the work done and update the progress bar
      ProgressBar& operator++(){
        //Quick return if this isn't the main thread
        if(omp_get_thread_num()!=0)
          return *this;
    
        work_done++;
        update(work_done);
        return *this;
      }
    
      ///Stop the progress bar. Throws an exception if it wasn't started.
      ///@return The number of seconds the progress bar was running.
      double stop(){
        clearConsoleLine();
    
        timer.stop();
        return timer.accumulated();
      }
    
      ///@return Return the time the progress bar ran for.
      double time_it_took(){
        return timer.accumulated();
      }
    
      uint32_t cellsProcessed() const {
        return work_done;
      }
    };
    
    int main(){
      ProgressBar pg;
      pg.start(100);
      //You should use 'default(none)' by default: be specific about what you're
      //sharing
      #pragma omp parallel for default(none) schedule(static) shared(pg)
      for(int i=0;i<100;i++){
        pg.update(i);
        std::this_thread::sleep_for(std::chrono::seconds(1));
      }
    }
    

    【讨论】:

    • 是否有可能让它每 1% 更新一次进度,所以它不是特定于数据的?
    • @Richard 很好的答案。谢谢。我想知道你为什么使用std::cerr 而不是std::cout[2K 部分还有什么作用?
    • @Person:我喜欢 cerr,因为进度并不是我们可能想要保存的真正输出。此外,在某些系统上,cerr 的缓冲区大小为零,因此写入它的内容会立即刷新,而缓冲的 cout 会导致显示延迟。
    【解决方案3】:

    我下面的代码类似于sehe 的代码,但有一些不同之处,这使我能够处理由于精确等式而导致的跳过点报告,包括模除。此外,全局计数器收集所有线程的实际循环执行,但它可能不精确——这对于这个特定问题是可以接受的。我只使用主线程进行报告。

    const size_t size = ...
    const size_t step_size = size / 100;
    const size_t nThreads = ...
    const size_t local_count_max = step_size / nThreads;
    size_t count = 0;
    #pragma omp parallel num_threads(nThreads)
    {
      size_t reported_count = 0;
      size_t local_count = 0;
      #pragma omp for
      for (size_t i = 0; i < size; ++i)
      {
        <... do some useful work ...>
        // -------------------------- update local and global progress counters
        if (local_count >= local_count_max)
        {
          #pragma omp atomic
          count += local_count_max;
          local_count = 0;
        }
        else
        {
          ++local_count;
        }
        // ------------------------------ report progress (in master thread only)
        #pragma omp master
        if (count - reported_count >= step_size)
        {
          <... report the progress ...>
          reported_count = count;
        }
      }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-01
      • 2021-05-20
      • 2013-11-28
      • 1970-01-01
      相关资源
      最近更新 更多