【问题标题】:Employing parallelism in pipelined execution在流水线执行中使用并行性
【发布时间】:2017-05-24 22:16:50
【问题描述】:

我正在尝试开发一个管道,其中首先读取和处理数据,操作一次,以不同的方式操作,然后显示。我想到了一个设计,其中数据 IO 输入到第一个操纵器读取的缓冲区中。随后,第一个操纵器写入另一个缓冲区,如果可能,第二个操纵器读取该缓冲区。最后,将第二个操纵器的输出写入显示缓冲区,由可视化器读取并使用 OpenGL 显示。

在我看来,这是一个相当简单的并行问题,其中每个任务都有自己的线程,并且它们通过数据缓冲区进行通信。但是,我遇到的所有线程程序教程似乎都表明,多线程是由决定如何划分工作负载的中间件(如 OpenMP)决定的。

我是开发多线程应用程序的新手,所以这可能是一个愚蠢的问题,但我所描述的是否可行,是否可以使用 OpenMP 之类的中间件来完成?我意识到显而易见的答案是“试试看”,我想这样做,但教程并没有说明*如何*尝试它。

【问题讨论】:

  • “缓冲块”是什么意思?函数调用块,而不是数据结构。
  • @KerrekSB:你是对的。我的意思是读取调用会阻塞,直到缓冲区中有数据
  • 我觉得你应该看看consumer/producer problem
  • @cbuchart:从理论上讲,我对此很熟悉。我的意思是,在学校里,我们复习了它并编写了原始代码来演示它。这就是我的设计的来源。这个问题是一个实际问题。对于实际的软件,我不知道这是否真的可行
  • 我只将 OpenMP 用于处理近乎微不足道的并行化问题(因此请谨慎理解),但您的用例似乎比仅仅解决问题更专业

标签: c++ multithreading openmp tbb tbb-flow-graph


【解决方案1】:

OpenMP 更适合轻松跨越多核 (SIMD) 的算法。其他情况也是可能的,但在您的情况下,我认为直接使用线程会更好,并且更容易编码和维护。

我将我的答案分为两部分:没有 OpenMP 的通用解决方案,以及使用 OpenMP 的一些特定更改。

正如评论中提到的,您面临生产者/消费者问题,但有两次:一个线程正在填充缓冲区(生产一个项目),然后必须由第二个(消耗)读取(和修改) .您的问题的特殊性在于,第二个线程也是生产者(要绘制的图像),而第三个线程是负责使用它的线程(可视化器)。

如您所知,P/C 问题是使用缓冲区(可能是循环缓冲区或生产项目队列)解决的,其中缓冲区的每个元素都标记为已生产或已消费,并且线程具有独占访问权限从中添加或获取项目时。


让我们在下面的示例程序中使用队列方法来解决您的问题。

  • 生产的项目将存储在队列中。队列的前端包含最旧的元素,即必须首先使用的元素。
  • 有两个队列:一个用于第一个操纵器生成的数据(并由第二个操纵器使用),另一个用于第二个操纵器生成的数据(将由另一个线程可视化)。
  • 生产阶段很简单:获得对相应队列的独占访问权并在末尾插入元素。
  • 消费类似,但必须等待队列至少有一个元素(不能为空)。
  • 我添加了一些睡眠来模拟其他操作。
  • 停止条件仅用于说明目的。

注意:为了简单起见,我假设您可以访问 C++11 编译器。使用其他 API 的实现相对类似。

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>

using namespace std::chrono_literals;

std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;

std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;

std::atomic<bool> stop = false;

void manipulator1_kernel()
{
  while (!stop) {
    // Producer 1: generate data
    {
      std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
      g_data_produced_by_m1.push_back(rand());
    }
    std::this_thread::sleep_for(100ms);
  }
}

void manipulator2_kernel()
{
  int data;

  while (!stop) {
    // Consumer 1
    while (!stop) { // wait until there is an item to be consumed
      {
        std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
        if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
          data = g_data_produced_by_m1.front(); // consume
          g_data_produced_by_m1.pop_front();
          break;
        }
      }
      std::this_thread::sleep_for(100ms);
    }

    // Producer 2: modify and send to the visualizer
    {
      std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
      g_data_produced_by_m2.push_back(5 * data);
    }

    std::this_thread::sleep_for(100ms);
  }
}

void visualizer_kernel()
{
  int data;

  while (!stop) {
    // Consumer 2
    while (!stop) { // wait until there is an item to be visualized
      {
        std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
        if (!g_data_produced_by_m2.empty()) {
          data = g_data_produced_by_m2.front();
          g_data_produced_by_m2.pop_front();
          break;
        }
      }
      std::this_thread::sleep_for(100ms);
    }

    std::cout << data << std::endl; // render to display
    std::this_thread::sleep_for(100ms);

    if (data % 8 == 0) stop = true; // some stop condition for the example
  }
}

int main()
{
  std::thread manipulator1(manipulator1_kernel);
  std::thread manipulator2(manipulator2_kernel);
  std::thread visualizer(visualizer_kernel);

  visualizer.join();
  manipulator2.join();
  manipulator1.join();

  return 0;
}

如果您仍想使用 OpenMP,您可以找到的最接近的可能是 tasks(我认为是从 OpenMP 3.0 开始)。我没怎么用过,不过上面的程序可以改写成这样:

int main()
{
  #pragma omp parallel
  {
    #pragma omp task
    manipulator1_kernel();
    #pragma omp task
    manipulator2_kernel();
    #pragma omp task
    visualizer_kernel();

    #pragma omp taskwait
  }    

  return 0;
}

其余代码也可以更改为使用 OpenMP 功能,但我认为这回答了您的问题。

这种方法的主要问题是您必须为任务创建一个代码块,以便在 OpenMP parallel 中运行,这很容易使您的应用程序逻辑和结构的其余部分复杂化。

【讨论】:

    【解决方案2】:

    为了解决这个特殊问题,英特尔® 线程构建模块库包含特殊结构。 Intel® TBB 是跨平台库,有助于多线程编程。 我们可以将您的应用程序中涉及的实体视为四个不同的任务提供程序。一种类型的任务是输入任务 - 提供输入数据的任务,另一种类型的任务由第一个操作例程提供,依此类推。

    因此,用户唯一需要做的就是为这些任务提供主体。库中有几个 API 用于指定要处理的主体以及如何并行处理。其他一切(这里我的意思是线程创建、任务执行之间的同步、工作平衡等)都由库完成。

    我想到的最简单的解决方案变体是使用parallel_pipeline 函数。这是原型:

    #include "tbb/pipeline.h"
    using namespace tbb;
    
    int main() {
        parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
            make_filter<void, input_data_type>(
                filter::serial_in_order, // read data sequentially
                [](flow_control& fc) -> input_data_type {
                    if ( /*check some stop condition: EOF, etc.*/ ) {
                        fc.stop();
                        return input_data_type(); // return dummy value
                    }
                    auto input_data = read_data();
                    return input_data;
                }
            ) &
            make_filter<input_data_type, manipulator1_output_type>(
                filter::parallel, // process data in parallel by the first manipulator
                [](input_data_type elem) -> manipulator1_output_type {
                    auto processed_elem = manipulator1::process(elem);
                    return processed_elem;
                }
            ) &
            make_filter<manipulator1_output_type, manipulator2_output_type>(
                filter::parallel, // process data in parallel by the second manipulator
                [](manipulator1_output_type elem) -> manipulator2_output_type {
                    auto processed_elem = manipulator2::process(elem);
                    return processed_elem;
                }
            ) &
            make_filter<manipulator2_output_type, void>(
                filter::serial_in_order, // visualize frame by frame
                [](manipulator2_output_type elem) {
                    visualize(elem);
                }
            )
        );
        return 0;
    }
    

    前提是实现了必要的功能(read_data、visualize)。这里input_data_typemanipulator1_output_type等是流水线阶段之间传递的类型,操纵器的process函数对传递的参数进行必要的计算。

    顺便说一句,为了避免使用锁和其他同步原语,您可以使用库中的 concurrent_bounded_queue 并将您的输入数据放入此队列,可能通过不同的线程(例如专用于 IO 操作),就像 concurrent_bounded_queue_instance.push(elem) 一样简单,然后通过input_data_type elem; concurrent_bounded_queue_instance.pop(elem)阅读。请注意,弹出一个项目在这里是一个阻塞操作。 concurrent_queue 提供非阻塞 try_pop 替代方案。

    另一种可能性是使用tbb::flow_graph 及其节点来组织相同的流水线方案。看一下描述dependencydata 流程图的两个示例。您可能需要使用 sequencer_node 来正确排序项目的执行(如有必要)。

    值得阅读标记的SO问题,看看其他人如何使用这个库。

    【讨论】:

      【解决方案3】:

      你实现了单线程版本吗?异形?

      它们是关键步骤,如果没有它们,您可以获得高度并行设计的最佳实现,只是要意识到瓶颈是缓冲区的 I/O 和/或线程同步和/或错误共享和/或缓存未命中或类似问题。

      我会首先尝试一个简单的线程池,其中包含按顺序执行所有步骤的任务。然后在分析了它的工作原理、CPU 消耗量等之后。我会尝试使用更复杂的工具总是将它们的性能与第一个简单版本进行比较

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2011-08-10
        • 2013-07-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-09-16
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多