【问题标题】:How to properly transfer control between two sets of threads back and forth如何在两组线程之间正确地来回转移控制
【发布时间】:2017-08-01 15:41:04
【问题描述】:

我想仅使用 C++ 11 的标准库来实现线程池。我想公开的接口是允许我的主线程一次提交许多作业,并等到所有线程完成后再继续。这是我第一次明确地处理线程,所以不可避免地会遇到一些死锁问题。这是我的代码:

class CrashQueue {
private:
    std::vector<std::thread> workers;
    std::queue<void*> payloads;
    std::function<void(void*)> function;
    std::mutex taskFetchingMutex;
    long aliveWorkers;

    std::condition_variable alarmClock;
    std::condition_variable sleepClock;
    std::mutex sleepClockMutex;
    bool running = true;

public:
    CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency()) {
        for (std::size_t i = 0; i < threadCount; i ++) {
            workers.emplace_back([this]() -> void {
                while (running) {
                    void* payload;
                    {
                        std::unique_lock<std::mutex> lock(taskFetchingMutex);
                        if (payloads.empty()) {
                            aliveWorkers --;
                            if (aliveWorkers == 0)
                                sleepClock.notify_one();
                            alarmClock.wait(lock);
                            continue;
                        }
                        payload = payloads.front();
                        payloads.pop();
                    }

                    function(payload);
                }
            });
        }
    }

    ~CrashQueue() {
        running = false;
        alarmClock.notify_all();
        for (auto& worker : workers)
            worker.join();
    }

    void run() {
        this->aliveWorkers = workers.size();
        alarmClock.notify_all();

        std::unique_lock<std::mutex> lock(sleepClockMutex);
        sleepClock.wait(lock);
    }

    void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
        this->function = std::move(function);
        this->payloads = std::move(payloads);
    }
};

我怀疑问题出在工作线程中执行的构造函数的 lambda 表达式中:

if (payloads.empty()) {
    aliveWorkers --;
    if (aliveWorkers == 0)
        sleepClock.notify_one();
    alarmClock.wait(lock);
    continue;
}

可能是最后一个工作线程唤醒主线程并在主线程唤醒所有其他线程之后进入睡眠。尽管如此,这似乎不太可能,但每次我不处于调试模式时都会发生死锁。有什么提示吗?

【问题讨论】:

  • aliveWorkers 必须是 std::atomic&lt;long&gt;

标签: c++ multithreading


【解决方案1】:

问题与我在这里使用两个互斥锁的事实有关。如下重写run 使其工作:

void run() {
    this->aliveWorkers = workers.size();
    alarmClock.notify_all();

    while (true) {
        std::unique_lock<std::mutex> lock(taskFetchingMutex);
        if (aliveWorkers.load() == 0)
            break;
        sleepClock.wait(lock);
    }
}

但是,我无法通过图片来了解原始代码为何会失败。任何解释仍然需要的答案。

编辑:我看似正确的代码的完整来源:

#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <functional>
#include <condition_variable>
#include <mutex>
#include <random>
#include <algorithm>
#include <tuple>
#include <queue>

class CrashQueue {
private:
    std::vector<std::thread> workers;
    std::queue<void*> payloads;
    std::function<void(void*)> function;
    std::mutex taskFetchingMutex;
    std::atomic<long> aliveWorkers;

    std::condition_variable alarmClock;
    std::condition_variable sleepClock;

    bool running = true;

public:
    CrashQueue(std::size_t threadCount = std::thread::hardware_concurrency())
    : aliveWorkers(threadCount) {
        for (std::size_t i = 0; i < threadCount; i ++) {
            workers.emplace_back([this]() -> void {
                while (running) {
                    void* payload;
                    {
                        std::unique_lock<std::mutex> lock(taskFetchingMutex);
                        if (payloads.empty()) {
                            aliveWorkers.fetch_sub(1);
                            sleepClock.notify_one();

                            alarmClock.wait(lock);
                            continue;
                        }
                        payload = payloads.front();
                        payloads.pop();
                    }

                    function(payload);
                }
            });
        }

        // Make sure all workers finished running.
        while (aliveWorkers.load() > 0);
        std::unique_lock<std::mutex> lock(taskFetchingMutex);
    }

    ~CrashQueue() {
        running = false;
        alarmClock.notify_all();
        for (auto& worker : workers)
            worker.join();
    }

    void run() {
        this->aliveWorkers = workers.size();
        alarmClock.notify_all();

        while (true) {
            std::unique_lock<std::mutex> lock(taskFetchingMutex);
            if (aliveWorkers.load() == 0)
                break;
            sleepClock.wait(lock);
        }
    }

    void commit(std::function<void(void*)>&& function, std::queue<void*>&& payloads) {
        this->function = std::move(function);
        this->payloads = std::move(payloads);
    }
};

【讨论】:

  • 你把aliveWorkers 原子化了吗?我注意到您使用了aliveWorkers.load(),但在问题中它只是long。这不是condition_variable 的完整答案。您应该使running 成为原子并将aliveWorkers 初始化为0。您也没有在commit() 中同步functionpayloads。请注意,如果在所有线程第一次到达if(payloads.empty()) 之前调用commit(),则线程可能会在您调用run 之前开始处理。作为一个关键点,我怀疑你还没有完全理解内存障碍。
  • @Persixty 没错,我还没有系统地看过任何关于并发的资料,而我使用原子变量的最初动机是避免锁。正如我所说,由于我缺乏知识,我认为这将是一件容易的事。我确实更改了aliveWorkers,只是将完整的源代码粘贴在 gist 上,并在答案中更新了链接。
  • @Persixty 我同意你关于同步functionpayloads 的观点,但我应该如何解决这个问题?我相信锁定一些mutex 不会有帮助。
  • @Persixty 一种可行的方法是在构造函数中调用run。似乎可以保证(或者至少我会保证)在run 之后我的所有工作人员都处于空闲状态,之后任何提交都是安全的。
  • 我会等到aliveWorkers 降为零。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-02
  • 2011-06-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多