【问题标题】:Sleeping Threadpool: worker thread awakens without notify() from main threadSleeping Threadpool:工作线程在没有来自主线程的 notify() 的情况下唤醒
【发布时间】:2019-10-22 07:54:24
【问题描述】:

我正在实现一个线程池,当工作人员没有准备好工作时,工作人员会在其中睡觉,而当工作人员忙时,主线程会睡觉。 我注意到工作线程在调用wait() 后继续工作,即使主线程没有notify_all()

输出如下所示:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......

工人函数:

void TaskSystemParallelThreadPoolSleeping::waitFunc() {
    std::unique_lock<std::mutex> lock(*this->mutex_);
    while(true) {
        this->num_wait++;
        std::cout << "WORKER WAIT" << std::endl;
        this->cond_->wait(lock,
                        std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
        std::cout << "WORKER AWAKENS!" << std::endl;
        if (this->done_flag == true) {
            this->mutex_->unlock();
            break;
        }
        this->mutex_->unlock();

        std::cout << "WORKER START" << std::endl;
        while (true) {
            this->mutex_->lock();

            if (this->not_done == 0) {  // ALL work done
                if (this->total_work != 0) {  // 1st time seen by workers
                    this->total_work = 0;
                    this->num_wait = 0;
                    std::cout << "WORKER WAKE MAIN" << std::endl;
                    this->mutex_->unlock();
                    this->cond_->notify_all();
                }
                this->mutex_->unlock();
                break;
            }

            int total = this->total_work;
            int id = this->work_counter;
            if (id == total) {  // NO work initiated or NO work left
                this->mutex_->unlock();
                continue;
            }

            ++(this->work_counter);  // increment counter
            this->mutex_->unlock();  // Let others access counters to work

            this->runnable->runTask(id, total); // do work

            this->mutex_->lock();
            --(this->not_done); // decrement counter after work done
            this->mutex_->unlock();
        }
        std::cout << "WORKER DONE" << std::endl;
    }
    std::cout << "WORKER TERMINATE" << std::endl;
}

主线程:

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
    //
    // TODO: CS149 students will modify the implementation of this
    // method in Part A.  The implementation provided below runs all
    // tasks sequentially on the calling thread.

    // Set-up work
    this->mutex_->lock();
    std::cout << "MAIN SETUP" << std::endl;
    this->runnable = runnable;
    this->work_counter = 0;
    this->not_done = num_total_tasks;
    this->total_work = num_total_tasks;

    // Tell workers there is work
    std::cout << "MAIN POLLS READINESS" << std::endl;
    while (this->num_wait < this->num_T) {  // Check if all ready
        this->mutex_->unlock();
        this->mutex_->lock();
    }
    std::cout << "ALL WORKERS READY" << std::endl;
    this->mutex_->unlock();
    this->cond_->notify_all();

    // Wait for workers to complete work
    std::unique_lock<std::mutex> lock(*this->mutex_);
    this->cond_->wait(lock,
                    std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
    std::cout << "MAIN END" << std::endl;
}

唤醒工人的条件:

bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
    return (this->done_flag == true ||
                    (this->total_work != 0 && this->num_wait == this->num_T));
}

唤醒主线程的条件:

bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
    return this->total_work == 0;
}

线程池构造函数:

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
    this->num_T = std::max(1, num_threads - 1);
    this->threads = new std::thread[this->num_T];
    this->mutex_ = new std::mutex();
    this->cond_ = new std::condition_variable();

    this->total_work = 0;
    this->not_done = 0;
    this->work_counter = 0;
    this->num_wait = 0;
    this->done_flag = {false};

    for (int i = 0; i < this->num_T; i++) {
        this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
    }
}

线程池析构函数:

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    this->done_flag = true;
    this->cond_->notify_all();

    for (int i = 0; i < this->num_T; i++) {
        this->threads[i].join();
    }

    delete this->mutex_;
    delete[] this->threads;
    delete this->cond_;
}

我认为开头应该是:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......

即。工人应该只在 main 的 notify_all() 之后醒来

编辑: 这是完整的日志。似乎工人的这种自我唤醒后来导致了僵局,其中一个工人自己唤醒并完成所有工作,设置this-&gt;num_wait=0this-&gt;total_work=0。因此所有线程只能看到this-&gt;num_wait=1

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
MAIN ENDWORKER DONE
WORKER WAIT
WORKER DONE
WORKER WAIT

MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER AWAKENS!
WORKER START
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEMAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER DONE
WORKER WAIT

WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONEWORKER DONE
WORKER WAIT

WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEWORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS

WORKER WAIT
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
ALL WORKERS READY
MAIN END
MAIN SETUP
MAIN POLLS READINESS

【问题讨论】:

    标签: multithreading c++11 threadpool mutex


    【解决方案1】:

    “工作线程在没有主线程通知的情况下唤醒”的原因是不言而喻的:this->num_wait 和 this->cond_->wait 的增量在同一个块内而不放弃锁/通知如果条件变量唤醒条件为真,则主线程。轮询中的最后一个线程直接通过了 wakeWorker() 中定义的条件,因此您可以观察到。

    (我希望这段代码只是一些可玩的玩具代码——它有太多问题......如果它因手动互斥锁/解锁而死锁,我并不感到惊讶......)

            this->num_wait++;
            std::cout << "WORKER WAIT" << std::endl;
            this->cond_->wait(lock,
                            std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
            std::cout << "WORKER AWAKENS!" << std::endl;
    

    根据https://en.cppreference.com/w/cpp/thread/condition_variable/wait,condition_variable::wait() 等价于

    while (!pred()) {
        wait(lock);
    }
    

    所以如果pred()返回true,执行不会放弃锁

    【讨论】:

    • 为什么最后一个线程直接通过检查?它不应该释放锁然后由于等待()而进入睡眠状态吗?
    • 根据en.cppreference.com/w/cpp/thread/condition_variable/wait,condition_variable::wait() 等价于 while (!pred()) { wait(lock); }。 IE。它首先检查条件,如果 pred() 为假则释放锁
    • @KcAble 如果您等待的事情已经发生,您就等不及了!
    • @DavidSchwartz 抱歉,您指的是哪一部分?
    • @KcAble 最后一个线程直接通过了检查,因为它正在等待已经发生的事情。如果您要求它等待的事情已经发生,wait 函数将不会等待——这可能导致永远等待。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-15
    • 1970-01-01
    • 2014-03-29
    • 2018-12-04
    • 2017-02-13
    相关资源
    最近更新 更多