【问题标题】:Clean cancellation of an std::thread that is blocked on an input/output call清除在输入/输出调用中阻塞的 std::thread
【发布时间】:2017-04-23 22:19:15
【问题描述】:

我有一个std::thread 可能在文件描述符输入/输出调用中被阻止,我怎样才能彻底取消它?

考虑以下示例:

#include <unistd.h>
#include <thread>

void thread_routine(int fd)
{
    char buf;
    read(fd, &buf, 1);
}

int main()
{
    int pipefd[2];
    pipe(pipefd);

    std::thread thread(&thread_routine, pipefd[0]);

    thread.join();

    close(pipefd[0]);
    close(pipefd[1]);
}

join() 之前我可以做什么来确保它不会永远锁定? (管道只是获取文件描述符的一种快速示例方法,我有一个more exotic scenario,但我试图得到一个通用的答案。)

使用 pthreads 我可以调用 pthread_cancel(),因为 read()write() 是取消点,但是没有 C++ 方法可以取消 std::thread(我可以获取 thread::native_handle() 并将其传递给 pthread_cancel(),但我想要一种更清洁的方法)。

请注意:

  • 我无法在非阻塞模式下设置文件描述符
  • 我不能在文件描述符上使用 select()

【问题讨论】:

    标签: c++ multithreading c++11 posix file-descriptor


    【解决方案1】:

    我相信答案取决于系统。有一些方法可以做到这一点,它在不同的操作系统中可能会有所不同。

    1. 关闭文件描述符。 比操作不会阻塞,但总是会出现错误。 此外,阻塞的操作也会以错误告终。

    2. 另一种方法是发送将由该线程处理的信号并中断任何系统操作。

    3. (如果可以使用 select)在 read() 调用前加上 select() 并尝试方法 1。

    4. (如果可以使用 select)添加另一个信号套接字并在两者上进行选择。如果可以读取信号套接字,则退出。

    5. (如果您可以使用 select)可能将所有逻辑都放在一个线程中是有意义的。例如,与 GPS/Sockets/Stdio 一起使用的 NTPD 是一个单线程应用程序,只有一个选择!

    我认为方法 4 是最好的。但是如果你不能使用 select 你可以试试 2 和 1

    不要使用 pthread_cancel。这真是糟糕的做法。

    【讨论】:

    • 使用 select() 是未定义的行为:stackoverflow.com/questions/543541/… 使用 read() 可以吗?
    • 回复您的更新:我不能使用 select() - 我正在使用的文件描述符不支持它。
    • 好吧,它并没有说它是“未定义的”。它只是说不同的操作系统的行为不同。我们在 QNX 中使用了这种做法,并且完全没问题。我相信对于 Linux 来说也可以。另请注意,即使 Linux 也不总是 POSIX。
    • 可能对你来说发送一个信号(当然是正确处理)是一个好方法。
    • NTPD 是一个著名的用于系统时间同步的开源应用程序,它处理来自不同来源的数据。如果您有兴趣,可以查看他们的 repo 中的代码。一个线程一选,处理所有的IO操作,包括网络、串口、stdio等
    【解决方案2】:

    使用std::future,您可以在有限的时间间隔内阻塞并最终决定停止等待并分离线程。如果您要退出,泄漏正在运行的线程并不重要。如果不退出,则阻塞调用将自然返回并且线程将被销毁,或者它将在进程的剩余生命周期内被阻塞。

    在以下示例中,MAX_WAIT_TIME 用作分离和退出的条件。

    #include <chrono>
    #include <future>
    #include <thread>
    
    using namespace std::literals;
    
    const auto BLOCK_DURATION = 100ms;
    const auto MAX_WAIT_TIME = 3s;
    
    int main() {
        // Some blocking call.
        auto blocking_call = [] { std::this_thread::sleep_for(1min); };
    
        // Wrap the callable in a packaged task and get future.
        std::packaged_task<void()> task{blocking_call};
        auto future = task.get_future();
    
        auto start_time = std::chrono::steady_clock::now();
        std::thread t{std::move(task)};
    
        // Continually check if task is finished, i.e., if blocking call has returned.
        auto status = future.wait_for(0s);
        while (status != std::future_status::ready) {
            status = future.wait_for(BLOCK_DURATION);
    
            // Room for condition to stop blocking, e.g., check some signaling etc.
            auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time);
            if (duration > MAX_WAIT_TIME) {
                break;
            }
        }
    
        if (status == std::future_status::ready) {
            future.get(); // Optionally get return value.
            t.join();
        } else {
            t.detach();
        }
    
        /* And we're on our way... */
    }
    

    std::future::wait_for 将在调用返回时立即返回,即使整个等待时间尚未结束。

    如果 时间 是呼叫可能阻塞多长时间的限制,那么甚至还有简洁的 std::future::wait_until例如

    auto start_time = std::chrono::steady_clock::now();
    std::thread t{std::move(task)};
    
    auto status = future.wait_until(start_time + MAX_WAIT_TIME);
    
    if (status == std::future_status::ready) {
        t.join();
    } else {
        t.detach();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-01-18
      • 2017-01-01
      • 2013-05-11
      • 1970-01-01
      • 2021-10-21
      • 2011-09-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多