【发布时间】:2015-11-21 16:13:19
【问题描述】:
我有一个程序,它产生多个线程,每个线程都执行一个长时间运行的任务。然后主线程等待所有工作线程加入,收集结果,然后退出。
如果其中一个工作人员发生错误,我希望其余工作人员优雅地停止,以便主线程可以稍后退出。
我的问题是,当长期运行的任务的实现由我无法修改其代码的库提供时,如何最好地做到这一点。
这是一个简单的系统草图,没有错误处理:
void threadFunc()
{
// Do long-running stuff
}
void mainFunc()
{
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.push_back(std::thread(&threadFunc));
}
for (auto &t : threads) {
t.join();
}
}
如果长时间运行的函数执行一个循环并且我可以访问代码,那么 只需在每次迭代顶部检查共享的“继续运行”标志即可中止执行。
std::mutex mutex;
bool error;
void threadFunc()
{
try {
for (...) {
{
std::unique_lock<std::mutex> lock(mutex);
if (error) {
break;
}
}
}
} catch (std::exception &) {
std::unique_lock<std::mutex> lock(mutex);
error = true;
}
}
现在考虑由库提供长时间运行的操作的情况:
std::mutex mutex;
bool error;
class Task
{
public:
// Blocks until completion, error, or stop() is called
void run();
void stop();
};
void threadFunc(Task &task)
{
try {
task.run();
} catch (std::exception &) {
std::unique_lock<std::mutex> lock(mutex);
error = true;
}
}
在这种情况下,主线程必须处理错误,并调用stop() on
仍在运行的任务。因此,它不能简单地等待每个工人
join() 与原始实现相同。
到目前为止我使用的方法是在 主线程和每个worker:
struct SharedData
{
std::mutex mutex;
std::condition_variable condVar;
bool error;
int running;
}
当工作人员成功完成时,它会减少 running 计数。如果
捕获到异常时,worker 设置error 标志。在这两种情况下,它
然后调用condVar.notify_one()。
然后主线程等待条件变量,如果有则唤醒
error 已设置或 running 达到零。醒来时,主线程
如果设置了error,则在所有任务上调用stop()。
这种方法有效,但我觉得应该有一个更清洁的解决方案,使用一些 标准并发库中的高级原语。能 有人建议改进的实现吗?
这是我当前解决方案的完整代码:
// main.cpp
#include <chrono>
#include <mutex>
#include <thread>
#include <vector>
#include "utils.h"
// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
Task(int tidx, bool fail)
: tidx(tidx)
, fail(fail)
, m_run(true)
{
}
void run()
{
static const int NUM_ITERATIONS = 10;
for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
{
std::unique_lock<std::mutex> lock(m_mutex);
if (!m_run) {
out() << "thread " << tidx << " aborting";
break;
}
}
out() << "thread " << tidx << " iter " << iter;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (fail) {
throw std::exception();
}
}
}
void stop()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_run = false;
}
const int tidx;
const bool fail;
private:
std::mutex m_mutex;
bool m_run;
};
// Data shared between all threads
struct SharedData
{
std::mutex mutex;
std::condition_variable condVar;
bool error;
int running;
SharedData(int count)
: error(false)
, running(count)
{
}
};
void threadFunc(Task &task, SharedData &shared)
{
try {
out() << "thread " << task.tidx << " starting";
task.run(); // Blocks until task completes or is aborted by main thread
out() << "thread " << task.tidx << " ended";
} catch (std::exception &) {
out() << "thread " << task.tidx << " failed";
std::unique_lock<std::mutex> lock(shared.mutex);
shared.error = true;
}
{
std::unique_lock<std::mutex> lock(shared.mutex);
--shared.running;
}
shared.condVar.notify_one();
}
int main(int argc, char **argv)
{
static const int NUM_THREADS = 3;
std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
std::vector<std::thread> threads(NUM_THREADS);
SharedData shared(NUM_THREADS);
for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
const bool fail = (tidx == 1);
tasks[tidx] = std::make_unique<Task>(tidx, fail);
threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
}
{
std::unique_lock<std::mutex> lock(shared.mutex);
// Wake up when either all tasks have completed, or any one has failed
shared.condVar.wait(lock, [&shared](){
return shared.error || !shared.running;
});
if (shared.error) {
out() << "error occurred - terminating remaining tasks";
for (auto &t : tasks) {
t->stop();
}
}
}
for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
out() << "waiting for thread " << tidx << " to join";
threads[tidx].join();
out() << "thread " << tidx << " joined";
}
out() << "program complete";
return 0;
}
这里定义了一些实用函数:
// utils.h
#include <iostream>
#include <mutex>
#include <thread>
#ifndef UTILS_H
#define UTILS_H
#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {
template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
Args&& ...args)
{
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
} // namespace std
#endif // __cplusplus <= 201103L
// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
ThreadSafeStdOut()
: m_lock(m_mutex)
{
}
~ThreadSafeStdOut()
{
std::cout << std::endl;
}
template <typename T>
ThreadSafeStdOut &operator<<(const T &obj)
{
std::cout << obj;
return *this;
}
private:
static std::mutex m_mutex;
std::unique_lock<std::mutex> m_lock;
};
std::mutex ThreadSafeStdOut::m_mutex;
// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
return ThreadSafeStdOut();
}
#endif // UTILS_H
【问题讨论】:
-
如果库有某种清理函数,您可以将其注册为回调,因此当主线程强制库线程停止时,将调用清理函数。希望清理工作应该关闭文件、套接字和其他描述符并释放内存和其他分配的资源。
-
我可能会尝试使用
std::condition_variable的方法。 -
πάντα ῥεῖ - 这就是我所做的(参见问题末尾的实现),但它看起来有点难看。感觉应该有更好的解决方案使用
std::future,但我不确定如何实现。 -
我真的很喜欢你目前的解决方案,很清楚发生了什么。
futures 似乎可以在这里应用,但我不相信它会更干净 -
@GarethStockwell 您是否需要停止线程,或者如果它们继续运行直到程序退出就可以了?
标签: c++ multithreading