【问题标题】:Parallel writer and reader of std::vectorstd::vector 的并行写入器和读取器
【发布时间】:2013-03-05 19:41:03
【问题描述】:

我有一个由 2 个线程同时使用的类:一个线程将结果(一个接一个)添加到任务的 results,第二个线程处理已经存在的 results

// all members are copy-able
struct task {
    command cmd;
    vector<result> results;
};

class generator {
    public:
        generator(executor* e); // store the ptr
        void run();
        ...
};

class executor {
    public:
        void run();
        void add_result(int command_id, result r);
        task& find_task(int command_id);
            ...
    private:
        vector<task> tasks_;
        condition_variable_any update_condition_;
};

启动

// In main, we have instances of generator and executor,
// we launch 2 threads and wait for them.
std::thread gen_th( std::bind( &generator::run, gen_instance_) );
std::thread exe_th( std::bind( &executor::run,  exe_instance_) );

生成器线程

void generator::run() {
    while(is_running) {
        sleep_for_random_seconds(); 
        executor_->add_result( SOME_ID, new_result() );
    }
}

执行线程

void executor::add_result( int command_id, result r ) {
    std::unique_lock<std::recursive_mutex> l(mutex_);
    task& t = this->find_task(command_id);
    t.results.push_back(r);
    update_condition_.notify_all();
}

void executor::run() { 
  while(is_running) {
     update_condition_.wait(...);
     task& t = this->find_task(SOME_ID);        
     for(result r: t.results) {
        // no live updates are visible here
     }
   }
}
  1. 生成器线程每隔几秒添加一个结果。
  2. Executor 线程本身就是一个executor。它通过 run 方法运行,该方法等待更新,当更新发生时,它会处理结果。

注意事项:

  1. 任务向量可能很大;结果永远不会被处理;
  2. 执行程序中的for-each 循环获取它正在处理的任务,然后迭代结果,检查其中哪些是新的并处理它们。一旦处理,它们将被标记并且不会再次被处理。此处理可能需要一些时间。

执行线程 在添加另一个结果之前没有完成 for 循环时会出现问题 - 结果对象在 for 循环中不可见。由于 Executor Thread 正在工作,它不会注意到更新条件的更新,也不会刷新向量等。当它完成时(处理 已经不是实际的视图 of tasks_) 它再次挂在刚刚触发的 update_condition_.. 上。

我需要让代码知道,它应该在完成后再次运行循环for-each循环中可见的任务进行更改。这个问题的最佳解决方案是什么?

【问题讨论】:

  • 看起来像是信号量的工作,不是吗?
  • 我认为这是我使用vector的方式的问题。代码非常同步(信号量上的互斥体)。
  • 我不明白的是:执行者是怎么知道123的?它在线程 2 的代码中被硬编码。那里一定有其他循环,对吧?另外,如果正如您的评论所说,那在run() 内部,那么executor_ 变量指向的是什么?不应该是this吗?
  • 这是一个经典的生产者/消费者组合。一个线程产生结果,另一个线程消耗它们。这就是发明计数信号量的问题。它对已生产但尚未消费的事物进行计数。我没有看到可以在您的代码中计数的同步对象。如果你没有这样的对象,你最终会实现一个。
  • 你不需要使用std::bind来启动一个线程,std::thread gen_th( &amp;generator::run, gen_instance_ );也可以,复制更少

标签: c++ multithreading concurrency vector c++11


【解决方案1】:

你只需要检查你的向量是否为空之前阻塞在CV上。类似的东西:

while (running) {
    std::unique_lock<std::mutex> lock(mutex);
    while (tasks_.empty()) // <-- this is important
        update_condition_.wait(lock);
    // handle tasks_
}

如果您的架构允许(即,如果您在处理任务时不需要持有锁),您可能还希望在处理任务之前尽快解锁互斥锁,以便生产者可以推送更多任务而无需阻塞。也许用一个临时向量交换你的tasks_ 向量,然后解锁互斥锁,然后才开始处理临时向量中的任务:

while (running) {
    std::unique_lock<std::mutex> lock(mutex);
    while (tasks_.empty())
        update_condition_.wait(lock);
    std::vector<task> localTasks;
    localTasks.swap(tasks_);
    lock.unlock(); // <-- release the lock early
    // handle localTasks
}

编辑:啊,现在我意识到这并不适合您的情况,因为您的消息不是直接在tasks_ 中,而是在tasks_.results 中。虽然你明白我的一般想法,但使用它需要在你的代码中进行结构更改(例如,扁平化你的任务/结果,并且总是有一个与单个结果相关联的 cmd)。

【讨论】:

  • if 应该是 while 来处理虚假唤醒
【解决方案2】:

我在同样的情况下采取以下方式

std::vector< ... > temp;
mutex.lock();
temp.swap( results );
mutex.unlock();
for(result r: temp ){
    ...
}

会有一点开销,但总的来说,整个代码更易读,如果计算量很大,那么复制的时间就会变为零(对不起,英语不是我的本机)))

【讨论】:

  • 你的代码的问题是,如果你的任务向量(你在这里称为results)是空的,那么你基本上是在忙循环。这就是 condition_variables 非常有用的原因。 ;)
  • 我只展示了其余的代码。当然线程应该等待一些条件然后处理数据
猜你喜欢
  • 1970-01-01
  • 2012-09-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多