【问题标题】:When one worker thread fails, how to abort remaining workers?当一个工作线程失败时,如何中止剩余的工作人员?
【发布时间】:2015-11-21 16:13:19
【问题描述】:

我有一个程序,它产生多个线程,每个线程都执行一个长时间运行的任务。然后主线程等待所有工作线程加入,收集结果,然后退出。

如果其中一个工作人员发生错误,我希望其余工作人员优雅地停止,以便主线程可以稍后退出。

我的问题是,当长期运行的任务的实现由我无法修改其代码的库提供时,如何最好地做到这一点。

这是一个简单的系统草图,没有错误处理:

void threadFunc()
{
    // Do long-running stuff
}

void mainFunc()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 3; ++i) {
        threads.push_back(std::thread(&threadFunc));
    }

    for (auto &t : threads) {
        t.join();
    }
}

如果长时间运行的函数执行一个循环并且我可以访问代码,那么 只需在每次迭代顶部检查共享的“继续运行”标志即可中止执行。

std::mutex mutex;
bool error;

void threadFunc()
{
    try {
        for (...) {
            {
                std::unique_lock<std::mutex> lock(mutex);
                if (error) {
                    break;
                }
            }
        }
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

现在考虑由库提供长时间运行的操作的情况:

std::mutex mutex;
bool error;

class Task
{
public:
    // Blocks until completion, error, or stop() is called
    void run();

    void stop();
};

void threadFunc(Task &task)
{
    try {
        task.run();
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

在这种情况下,主线程必须处理错误,并调用stop() on 仍在运行的任务。因此,它不能简单地等待每个工人 join() 与原始实现相同。

到目前为止我使用的方法是在 主线程和每个worker:

struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;
}

当工作人员成功完成时,它会减少 running 计数。如果 捕获到异常时,worker 设置error 标志。在这两种情况下,它 然后调用condVar.notify_one()

然后主线程等待条件变量,如果有则唤醒 error 已设置或 running 达到零。醒来时,主线程 如果设置了error,则在所有任务上调用stop()

这种方法有效,但我觉得应该有一个更清洁的解决方案,使用一些 标准并发库中的高级原语。能 有人建议改进的实现吗?

这是我当前解决方案的完整代码:

// main.cpp

#include <chrono>
#include <mutex>
#include <thread>
#include <vector>

#include "utils.h"

// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
    Task(int tidx, bool fail)
    :   tidx(tidx)
    ,   fail(fail)
    ,   m_run(true)
    {

    }

    void run()
    {
        static const int NUM_ITERATIONS = 10;

        for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                if (!m_run) {
                    out() << "thread " << tidx << " aborting";
                    break;
                }
            }

            out() << "thread " << tidx << " iter " << iter;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            if (fail) {
                throw std::exception();
            }
        }
    }

    void stop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_run = false;
    }

    const int tidx;
    const bool fail;

private:
    std::mutex m_mutex;
    bool m_run;
};

// Data shared between all threads
struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;

    SharedData(int count)
    :   error(false)
    ,   running(count)
    {

    }
};

void threadFunc(Task &task, SharedData &shared)
{
    try {
        out() << "thread " << task.tidx << " starting";

        task.run(); // Blocks until task completes or is aborted by main thread

        out() << "thread " << task.tidx << " ended";
    } catch (std::exception &) {
        out() << "thread " << task.tidx << " failed";

        std::unique_lock<std::mutex> lock(shared.mutex);
        shared.error = true;
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);
        --shared.running;
    }

    shared.condVar.notify_one();
}

int main(int argc, char **argv)
{
    static const int NUM_THREADS = 3;

    std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
    std::vector<std::thread> threads(NUM_THREADS);

    SharedData shared(NUM_THREADS);

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        const bool fail = (tidx == 1);
        tasks[tidx] = std::make_unique<Task>(tidx, fail);
        threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);

        // Wake up when either all tasks have completed, or any one has failed
        shared.condVar.wait(lock, [&shared](){
            return shared.error || !shared.running;
        });

        if (shared.error) {
            out() << "error occurred - terminating remaining tasks";
            for (auto &t : tasks) {
                t->stop();
            }
        }
    }

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        out() << "waiting for thread " << tidx << " to join";
        threads[tidx].join();
        out() << "thread " << tidx << " joined";
    }

    out() << "program complete";

    return 0;
}

这里定义了一些实用函数:

// utils.h

#include <iostream>
#include <mutex>
#include <thread>

#ifndef UTILS_H
#define UTILS_H

#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {

template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
            Args&& ...args)
{
    return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace std
#endif // __cplusplus <= 201103L

// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
    ThreadSafeStdOut()
    :   m_lock(m_mutex)
    {

    }

    ~ThreadSafeStdOut()
    {
        std::cout << std::endl;
    }

    template <typename T>
    ThreadSafeStdOut &operator<<(const T &obj)
    {
        std::cout << obj;
        return *this;
    }

private:
    static std::mutex m_mutex;
    std::unique_lock<std::mutex> m_lock;
};

std::mutex ThreadSafeStdOut::m_mutex;

// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
    return ThreadSafeStdOut();
}

#endif // UTILS_H

【问题讨论】:

  • 如果库有某种清理函数,您可以将其注册为回调,因此当主线程强制库线程停止时,将调用清理函数。希望清理工作应该关闭文件、套接字和其他描述符并释放内存和其他分配的资源。
  • 我可能会尝试使用std::condition_variable 的方法。
  • πάντα ῥεῖ - 这就是我所做的(参见问题末尾的实现),但它看起来有点难看。感觉应该有更好的解决方案使用std::future,但我不确定如何实现。
  • 我真的很喜欢你目前的解决方案,很清楚发生了什么。 futures 似乎可以在这里应用,但我不相信它会更干净
  • @GarethStockwell 您是否需要停止线程,或者如果它们继续运行直到程序退出就可以了?

标签: c++ multithreading


【解决方案1】:

The implementation of the long-running task is provided by a library whose code I cannot modify.

这意味着你无法同步工作线程完成的工作

If an error occurs in one of the workers,

让我们假设你真的可以检测工人错误;如果使用的库报告了其中一些可以很容易地检测到,而其他库则不能,即

  1. 库代码循环。
  2. 库代码因未捕获的异常而提前退出。

I want the remaining workers to stop **gracefully**

这是不可能的

您能做的最好的事情是编写一个线程管理器来检查工作线程状态,如果检测到错误情况,它只会(不优雅地)“杀死”所有工作线程并退出。

您还应该考虑检测循环的工作线程(通过超时)并向用户提供终止或继续等待进程完成的选项。

【讨论】:

  • 有可能,OP已经显示了。 stop() 有一个 API,并且工作正常。错误是抛出的异常,因此您可以在包装器中捕获它们,正如 OP 已经显示的那样。 OP 要求(并且可以做到)的唯一区别是在线程包装器和 main() 之间使用某种不同类型的同步。
【解决方案2】:

你的问题是长时间运行的函数不是你的代码,你说你不能修改它。因此,除非库开发人员为您这样做,否则您不能让它关注任何类型的外部同步原语(条件变量、信号量、互斥体、管道等)。

因此,您唯一的选择是做一些事情,不管它在做什么,都可以从任何代码中摆脱控制。这就是信号的作用。为此,您将不得不使用pthread_kill(),或者这些天的任何等价物。

模式是这样的

  1. 检测到错误的线程需要以某种方式将该错误传达回主线程。
  2. 然后,主线程需要为所有其他剩余线程调用 pthread_kill()。不要被名称所迷惑——pthread_kill() 只是一种向线程传递任意信号的方法。请注意,STOP、CONTINUE 和 TERMINATE 之类的信号是进程范围的,即使是使用 pthread_kill() 引发的,而不是特定于线程的,所以不要使用它们。
  3. 在每个线程中,您都需要一个信号处理程序。在将信号传递给线程时,该线程中的执行路径将跳转到处理程序,无论长时间运行的函数在做什么。
  4. 您现在回到(有限的)控制中,并且可以(可能,好吧,也许)进行一些有限的清理并终止线程。
  5. 与此同时,主线程将在所有收到信号的线程上调用 pthread_join(),现在这些线程将返回。

我的想法:

  • 这是一种非常丑陋的做法(众所周知,信号/pthread 很难正确处理,而且我不是专家),但我真的不知道您还有什么其他选择。
  • 要在源代码中看起来“优雅”还有很长的路要走,尽管最终用户体验会很好。
  • 您将在运行该库函数的过程中中止执行,因此,如果有任何清理工作通常会执行(例如,释放已分配的内存),但不会完成并且您将发生内存泄漏。如果发生这种情况,在 valgrind 之类的东西下运行是一种解决方法。
  • 清理库函数(如果需要)的唯一方法是让您的信号处理程序将控制权返回给函数并让它运行完成,这正是您不想做的事情。
  • 当然,这在 Windows 上不起作用(没有 pthread,至少没有值得一提的,尽管可能存在等效机制)。

最好的方法是重新实现(如果可能的话)该库函数。

【讨论】:

    【解决方案3】:

    我一直在考虑您的情况,这可能对您有所帮助。您可能可以尝试使用几种不同的方法来实现您的目标。有 2-3 个选项可能有用,或者是所有三个选项的组合。我将至少展示第一个选项,因为我仍在学习并尝试掌握模板专业化的概念以及使用 Lambda。

    • 使用管理器类
    • 使用模板专业化封装
    • 使用 Lambda。

    Manager 类的伪代码如下所示:

    class ThreadManager {
    private:
        std::unique_ptr<MainThread> mainThread_;
        std::list<std::shared_ptr<WorkerThread> lWorkers_;  // List to hold finished workers
        std::queue<std::shared_ptr<WorkerThread> qWorkers_; // Queue to hold inactive and waiting threads.
        std::map<unsigned, std::shared_ptr<WorkerThread> mThreadIds_; // Map to associate a WorkerThread with an ID value.
        std::map<unsigned, bool> mFinishedThreads_; // A map to keep track of finished and unfinished threads.
    
        bool threadError_; // Not needed if using exception handling
    public:
        explicit ThreadManager( const MainThread& main_thread );
    
        void shutdownThread( const unsigned& threadId );
        void shutdownAllThreads();
    
        void addWorker( const WorkerThread& worker_thread );          
        bool isThreadDone( const unsigned& threadId );
    
        void spawnMainThread() const; // Method to start main thread's work.
    
        void spawnWorkerThread( unsigned threadId, bool& error );
    
        bool getThreadError( unsigned& threadID ); // Returns True If Thread Encountered An Error and passes the ID of that thread, 
    
    };
    

    仅出于演示目的,我使用 bool 值来确定线程是否失败以简化结构,当然,如果您更喜欢使用异常或无效的无符号值等,这可以替换为您的喜欢。

    现在使用这种类型的类将是这样的:另请注意,如果它是单例类型的对象,则认为这种类型的类会更好,因为您不希望超过 1 个 ManagerClass,因为您正在工作使用共享指针。

    SomeClass::SomeClass( ... ) {
        // This class could contain a private static smart pointer of this Manager Class
        // Initialize the smart pointer giving it new memory for the Manager Class and by passing it a pointer of the Main Thread object
    
       threadManager_ = new ThreadManager( main_thread ); // Wouldn't actually use raw pointers here unless if you had a need to, but just shown for simplicity       
    }
    
    SomeClass::addThreads( ... ) {
        for ( unsigned u = 1, u <= threadCount; u++ ) {
             threadManager_->addWorker( some_worker_thread );
        }
    }
    
    SomeClass::someFunctionThatSpawnsThreads( ... ) {
        threadManager_->spawnMainThread();
    
        bool error = false;       
        for ( unsigned u = 1; u <= threadCount; u++ ) {
            threadManager_->spawnWorkerThread( u, error );
    
            if ( error ) { // This Thread Failed To Start, Shutdown All Threads
                threadManager->shutdownAllThreads();
            }
        }
    
        // If all threads spawn successfully we can do a while loop here to listen if one fails.
        unsigned threadId;
        while ( threadManager_->getThreadError( threadId ) ) {
             // If the function passed to this while loop returns true and we end up here, it will pass the id value of the failed thread.
             // We can now go through a for loop and stop all active threads.
             for ( unsigned u = threadID + 1; u <= threadCount; u++ ) {
                 threadManager_->shutdownThread( u );
             }
    
             // We have successfully shutdown all threads
             break;
        }
    }
    

    我喜欢管理器类的设计,因为我在其他项目中使用过它们,并且它们经常派上用场,尤其是在使用包含许多和多种资源的代码库时,例如具有许多资产的工作游戏引擎,例如如精灵、纹理、音频文件、地图、游戏项目等。使用管理器类有助于跟踪和维护所有资产。同样的概念可以应用于“管理”活动、非活动、等待线程,并且知道如何直观地正确处理和关闭所有线程。如果您的代码库和库支持异常以及线程安全异常处理,而不是传递和使用布尔值来处理错误,我建议您使用 ExceptionHandler。还拥有一个 Logger 类可以很好地写入日志文件和/或控制台窗口,以提供有关异常被抛出的函数以及导致异常的原因的显式消息,其中日志消息可能如下所示:

    Exception Thrown: someFunctionNamedThis in ThisFile on Line# (x)
        threadID 021342 failed to execute.
    

    通过这种方式,您可以查看日志文件并很快找出导致异常的线程,而不是使用传递的 bool 变量。

    【讨论】:

    • 这是另一个没有提供的重要答案。您完全忽略了不受控制的库代码的意义,以及在某些错误情况下无法实现优雅线程控制的目标。
    • @Pat 我在这里的目的是展示一种可以管理多个线程的设计,它会关联一个主线程,该主线程是私有唯一智能指针,所有其他线程都是共享智能指针,它将是管理器类负责处理所有线程的启动、停止和错误检查。是的,我确实理解在某些错误情况下,您无能为力。但是由于类似的情况发生在 3D GameEngine 中,我拥有的 Logger 类和 ExceptionHandler 类;抛出了不同级别的异常,例如Error、Warning、Info...
    • @Pat (...continued) 严重等,如果是一般警告则这样;引擎仍将继续,但不会使用该资源,如果它是错误或严重的,它会在释放所有资源后关闭应用程序。信息消息类型或多或少适用于 Logger 本身。 GameEngine 是一个由许多部分组成的复杂野兽,它旨在与现代 OpenGL 配合使用,并且可以处理和支持多线程。
    • 我知道你做了什么;我的观点是,OP 要求 gracefully 控制无法真正满足该要求的失败线程。然后,除了我们可以在许多书籍中找到的“如何处理 C++ 中的线程”之外,讨论使 OP 的示例与任何线程书示例不同的约束(库代码)可能是一个好主意。
    • @Pat 够真实!这是我在编程和软件开发方面所享受的一件事。无论您认为自己知道多少以及使用它的工作越多;你真正知道的越少,因为你总是在学习,因为语言确实有随着时间发展的趋势。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-12
    相关资源
    最近更新 更多