【问题标题】:How do I parallelize a for loop through a C++ std::list using OpenMP?如何使用 OpenMP 通过 C++ std::list 并行化 for 循环?
【发布时间】:2012-01-31 06:20:49
【问题描述】:

我想使用 OpenMP 以并行方式遍历 std::list 中的所有元素。循环应该能够改变列表的元素。有一个简单的解决方案吗?当迭代器是随机访问迭代器时,OpenMP 3.0 似乎支持并行 for 循环,但否则不支持。无论如何,我更喜欢使用 OpenMP 2.0,因为我无法完全控制哪些编译器可供我使用。

如果我的容器是矢量,我可能会使用:

#pragma omp parallel for
for (auto it = v.begin(); it != v.end(); ++it) {
    it->process();
}

我知道我可以将列表复制到向量中,执行循环,然后将所有内容复制回来。但是,如果可能,我想避免这种复杂性和开销。

【问题讨论】:

    标签: c++ list parallel-processing openmp


    【解决方案1】:

    我怀疑这是可能的,因为你不能在不遍历列表的情况下跳到列表的中间。列表不存储在连续内存中,std::list 迭代器不是随机访问。它们只是双向的。

    【讨论】:

    • 好吧,如果每个元素的处理比迭代更昂贵,那么并行化可能仍然是可取的。
    • 如果有 2 个执行线程,您可以使用 it1 = v.begin()it2 = it1 + 1,然后使用 it1 += 2it2 += 2 进行迭代。
    • @KennyTM,谢谢。这符合我正在寻找的内容。
    • 但是 += 2 在非随机访问迭代器上仍然需要遍历迭代器一次迭代。我想如果列表在循环期间是 const 的话,这会起作用。但我猜你需要做很多设置才能让它工作。你不能说 it1 += 2 因为你仍然需要测试 it1 != list.end() 两个增量。
    【解决方案2】:

    如果您决定使用Openmp 3.0,您可以使用task 功能:

    #pragma omp parallel
    #pragma omp single
    {
      for(auto it = l.begin(); it != l.end(); ++it)
         #pragma omp task firstprivate(it)
           it->process();
      #pragma omp taskwait
    }
    

    这将在一个线程中执行循环,但将元素的处理委托给其他线程。

    如果没有OpenMP 3.0,最简单的方法是写入指向列表中元素的所有指针(或向量中的迭代器并迭代该元素。这样您就不必复制任何内容并避免复制元素本身,所以它不应该有太多的开销:

    std::vector<my_element*> elements; //my_element is whatever is in list
    for(auto it = list.begin(); it != list.end(); ++it)
      elements.push_back(&(*it));
    
    #pragma omp parallel shared(chunks)
    {
      #pragma omp for
      for(size_t i = 0; i < elements.size(); ++i) // or use iterators in newer OpenMP
          elements[i]->process();
    }
    

    如果你想避免复制指针,你总是可以手动创建一个并行化的 for 循环。您可以让线程访问列表的交错元素(如 KennyTM 建议的那样),或者在迭代和迭代之前将范围分成大致相等的连续部分。后者似乎更可取,因为线程避免访问当前由其他线程处理的列表节点(即使只有下一个指针),这可能导致错误共享。这大概是这样的:

    #pragma omp parallel
    {
      int thread_count = omp_get_num_threads();
      int thread_num   = omp_get_thread_num();
      size_t chunk_size= list.size() / thread_count;
      auto begin = list.begin();
      std::advance(begin, thread_num * chunk_size);
      auto end = begin;
      if(thread_num = thread_count - 1) // last thread iterates the remaining sequence
         end = list.end();
      else
         std::advance(end, chunk_size);
      #pragma omp barrier
      for(auto it = begin; it != end; ++it)
        it->process();
    }
    

    屏障并不是严格需要的,但是如果process 改变了已处理的元素(意味着它不是一个 const 方法),如果线程迭代一个已经存在的序列,则可能会出现某种错误共享。突变。这种方式将在序列上迭代 3*n 次(其中 n 是线程数),因此对于大量线程而言,缩放可能不是最佳的。

    为了减少开销,您可以将范围的生成放在#pragma omp parallel 之外,但是您需要知道有多少线程将形成并行部分。因此,您可能必须手动设置num_threads,或使用omp_get_max_threads() 并处理创建的线程数少于omp_get_max_threads()(这只是一个上限)的情况。最后一种方法可以通过在这种情况下为每个线程分配多个块来处理(使用#pragma omp for 应该这样做):

    int max_threads = omp_get_max_threads();
    std::vector<std::pair<std::list<...>::iterator, std::list<...>::iterator> > chunks;
    chunks.reserve(max_threads); 
    size_t chunk_size= list.size() / max_threads;
    auto cur_iter = list.begin();
    for(int i = 0; i < max_threads - 1; ++i)
    {
       auto last_iter = cur_iter;
       std::advance(cur_iter, chunk_size);
       chunks.push_back(std::make_pair(last_iter, cur_iter);
    }
    chunks.push_back(cur_iter, list.end();
    
    #pragma omp parallel shared(chunks)
    {
      #pragma omp for
      for(int i = 0; i < max_threads; ++i)
        for(auto it = chunks[i].first; it != chunks[i].second; ++it)
          it->process();
    }
    

    这将只需要对list 进行三次迭代(两次,如果您可以在不迭代的情况下获得列表的大小)。我认为这对于非随机访问迭代器来说是最好的,而无需使用tasks 或迭代一些不合适的数据结构(如指针向量)。

    【讨论】:

    • 感谢您的详细回答。
    • 我想遍历整个map。如何使用 OpenMp 迭代整个地图?
    【解决方案3】:

    http://openmp.org/forum/viewtopic.php?f=3&t=51

    #pragma omp parallel
    {
       for(it= list1.begin(); it!= list1.end(); it++)
       {
          #pragma omp single nowait
          {
             it->compute();
          }
       } // end for
    } // end ompparallel
    

    这可以理解为展开为:

    {
      it = listl.begin
      #pragma omp single nowait
      {
        it->compute();
      }
      it++;
      #pragma omp single nowait
      {
        it->compute();
      }
      it++;
    ...
    }
    

    给定这样的代码:

    int main()                                                                      
    {                                                                               
            std::vector<int> l(4,0);                                                
            #pragma omp parallel for                                                        
            for(int i=0; i<l.size(); ++i){                                          
                    printf("th %d = %d \n",omp_get_thread_num(),l[i]=i);            
            }                                                                       
            printf("\n");                                                           
           #pragma omp parallel                                                            
            {                                                                       
                    for (auto i = l.begin(); i != l.end(); ++i) {                   
                   #pragma omp single nowait                                                       
                    {                                                       
                            printf("th %d = %d \n",omp_get_thread_num(),*i);
                    }                                                       
                }                                                               
            }                                                                       
            return 0;                                                               
    } 
    

    导出OMP_NUM_THREADS=4,输出如下(注意第二部分,工作线程号可以重复):

    th 2 = 2 
    th 1 = 1 
    th 0 = 0 
    th 3 = 3 
    
    th 2 = 0 
    th 1 = 1 
    th 2 = 2 
    th 3 = 3
    

    【讨论】:

    • 应该是:#pragma omp parallel private(it) 这样每个线程都得到一个迭代器的副本
    【解决方案4】:

    不使用 OpenMP 3.0,您可以选择让所有线程遍历列表:

    std::list<T>::iterator it;
    #pragma omp parallel private(it)
    {
       for(it = list1.begin(); it!= list1.end(); it++)
       {
          #pragma omp single nowait
          {
             it->compute();
          }
       } 
    } 
    

    在这种情况下,每个线程都有自己的迭代器副本(private),但只有一个线程会访问特定元素(single),而其他线程会前进到下一个项目(nowait

    或者您可以循环一次以构建一个指针向量,然后在线程之间分配:

    std::vector< T*> items;
    
    items.reserve(list.size());
    //put the pointers in the vector
    std::transform(list.begin(), list.end(), std::back_inserter(items), 
                   [](T& n){ return &n; }
    );
    
    #pragma omp parallel for
    for (int i = 0; i < items.size(); i++)
    {
      items[i]->compute();
    }
    

    根据您的具体情况,一种或另一种可能更快。测试哪个更适合您很容易。

    【讨论】:

      【解决方案5】:

      这是一个允许并行插入/删除列表的新元素的解决方案。

      对于带有N 元素的列表,我们首先将列表切割成nthreads 列表 大约有N/nthreads 元素。在并行区域中,可以这样完成

      int ithread = omp_get_thread_num();
      int nthreads = omp_get_num_threads();
      int t0 = (ithread+0)*N/nthreads;
      int t1 = (ithread+1)*N/nthreads;
      
      std::list<int> l2;
      #pragma omp for ordered schedule(static)
      for(int i=0; i<nthreads; i++) {
          #pragma omp ordered
          {
              auto it0 = l.begin(), it1 = it0;
              std::advance(it1, t1-t0);       
              l2.splice(l2.begin(), l2, it0, it1);
          }
      }
      

      l2 是每个线程的切割清单。

      然后我们可以并行处理每个列表。例如,我们可以像这样在列表中的每个第一个位置插入 -1

      auto it = l2.begin();
      for(int i=(t0+4)/5; i<(t1+4)/5; i++) {
          std::advance(it, 5*i-t0);
          l2.insert(it, -1);
      }
      

      最后,在我们对列表进行并行操作之后,我们将每个线程的列表拼接回一个列表,如下所示:

      #pragma omp for ordered schedule(static)
      for(int i=0; i<nthreads; i++) {
          #pragma omp ordered
          l.splice(l.end(), l, l2.begin(), l2.end());
      }
      

      算法本质上是。

      1. 通过列表顺序快速制作剪切列表。
      2. 并行处理切割清单添加、修改或删除元素。
      3. 将修改后的切割清单按顺序重新拼接在一起。

      这是一个工作示例

      #include <algorithm>
      #include <iostream>
      #include <list>
      #include <omp.h>
      
      int main(void) {
        std::list<int> l;
        for(int i=0; i<22; i++) {
          l.push_back(i);
        }
        for (auto it = l.begin(); it != l.end(); ++it) {
          std::cout << *it << " ";
        } std::cout << std::endl;
      
        int N = l.size();
        #pragma omp parallel
        {
          int ithread = omp_get_thread_num();
          int nthreads = omp_get_num_threads();
          int t0 = (ithread+0)*N/nthreads;
          int t1 = (ithread+1)*N/nthreads;
      
          //cut list into nthreads lists with size=N/nthreads
          std::list<int> l2;
          #pragma omp for ordered schedule(static)
          for(int i=0; i<nthreads; i++) {
            #pragma omp ordered
            {
          auto it0 = l.begin(), it1 = it0;
          std::advance(it1, t1-t0);       
          l2.splice(l2.begin(), l2, it0, it1);
            }
          }
          //insert -1 every 5th postion
          auto it = l2.begin();
          for(int i=(t0+4)/5; i<(t1+4)/5; i++) {
            std::advance(it, 5*i-t0);
            l2.insert(it, -1);
          }
      
          //splice lists in order back together.
          #pragma omp for ordered schedule(static)
          for(int i=0; i<nthreads; i++) {
            #pragma omp ordered
            l.splice(l.end(), l, l2.begin(), l2.end());
          }  
        }
      
        for (auto it = l.begin(); it != l.end(); ++it) {
          std::cout << *it << " ";
        } std::cout << std::endl;  
      }
      

      结果

      0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 
      -1 0 1 2 3 4 -1 5 6 7 8 9 -1 10 11 12 13 14 -1 15 16 17 18 19 -1 20 21
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2016-07-23
        • 1970-01-01
        • 1970-01-01
        • 2013-09-11
        相关资源
        最近更新 更多