【问题标题】:Waiting boost asio's future lasts forever after io_service.stop()在 io_service.stop() 之后等待 boost asio 的未来将永远持续下去
【发布时间】:2016-02-26 12:35:06
【问题描述】:

我尝试等待从任何boost::asio::async_ 函数返回的std::future 对象(使用use_future)。例如:

auto endpoint_future = resolver.async_resolve(query, boost::asio::use_future);
endpoint_future.wait(); // lasts forever after io_service.stop();

这是在一个线程中完成的。 我还有另一个线程在此async_resolve 调用之前启动:

runner_ = std::thread([this]() {
    boost::asio::io_service::work work(io_service_);
    io_service_.run();
});

一切正常,但后来我还添加了一个boost::asio::deadline_timer 以停止与io_service 的任何工作:

void deadlineTimeout() {
    deadline_.cancel();
    io_service_.stop();
    // closing socket here does not work too
}

但是,在deadlineTimeout() 中,当截止日期到达它的超时并且它执行io_service_.stop() 未来没有被释放所以endpointer_future.wait() 仍然阻塞。在这种情况下,我该如何停止等待未来?

【问题讨论】:

    标签: c++ multithreading c++11 boost boost-asio


    【解决方案1】:

    我自己找到了一个解决方案:我们不需要stop()io_service而是reset()它,在此之前我们需要关闭套接字,因此正确的超时回调将是:

    void deadlineTimeout() {
        deadline_.cancel();
        socket_.close(); // socket_ is a socket on io_service_
        io_service_.reset();
    }
    

    在此更改之后,所有期货都将被释放。

    【讨论】:

    • io_service::stop() 只是设置了一个标志,要求服务“尽快”停止。只要您在套接字上有未完成的读取,服务就无法停止,因此socket_.close() 是您答案的重要部分。
    • 你错了。关闭套接字并调用 io_service.stop() 后不起作用。这里重要的是调用 reset() 而不是 stop()。
    • 在仍在运行的 io_service 上调用 reset() 是错误的。你应该彻底关闭它。如果您计划在服务停止后重用该服务,则应使用 reset 。 boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/…
    • 您还需要在服务停止之前删除work。 (重置通过蛮力做到这一点(讨厌))
    • reset() 文档严格禁止在io_service 上调用reset(),而对poll()poll_one()run()run_one() 的调用未完成。
    【解决方案2】:

    io_sevice.stop() 的调用将导致所有run()run_one() 的调用尽快返回。从处理程序中调用时,调用者将从run() 返回,而不调用任何其他处理程序。在您的情况下,async_resolve 的完成处理程序将设置与endpoint_future 关联的promise;但是,通过停止io_service,将不会调用完成处理程序。考虑:

    • cancel()future 关联的 I/O 对象,然后继续运行 io_service 直到完成,以便设置 promise 的值
    • 销毁所有 I/O 对象,然后销毁 io_service,以便删除处理程序并通过 future 检测到损坏的承诺
    • 循环对future 执行定时等待,如果future 准备好或io_service 已停止则退出循环。例如,以下函数返回一个带有未来值的boost::optional,如果不设置未来,则返回boost::none

      template <typename T>
      boost::optional<T> get(
        boost::asio::io_service& io_service,
        std::future<T>& future)
      {
        for (;;)
        {
          // If the future is ready, get the value.
          if (future.wait_for(std::chrono::seconds(1)) == std::future_status::ready)
          {
            return {future.get()};
          }
          // Otherwise, if the future is never going to be set, return none.
          if (io_service.stopped())
          {
            return {boost::none};
          }
        }
      }
      
      ...
      if (auto endpoint_iterator = get(io_service, endpoint_future))
      {
        // use *endpoint_iterator...
      }
      

    下面是一个示例 demonstrating 如何在停止 io_service 的同时安全地等待未来:

    #include <chrono>
    #include <iostream>
    #include <thread>
    #include <boost/asio.hpp>
    #include <boost/asio/use_future.hpp>
    #include <boost/optional.hpp>
    
    template <typename T>
    boost::optional<T> get(
      boost::asio::io_service& io_service,
      std::future<T>& future)
    {
      for (;;)
      {
        // If the future is ready, get the value.
        if (future.wait_for(std::chrono::seconds(1)) == std::future_status::ready)
        {
          return {future.get()};
        }
        // Otherwise, if the future is never going to be set, return none.
        if (io_service.stopped())
        {
          std::cout << "io_service stopped, future will not be set" << std::endl;
          return {boost::none};
        }
        std::cout << "timeout waiting for future" << std::endl;
      }
    }
    
    int main()
    {
      boost::asio::io_service io_service;
    
      // Create I/O objects.
      boost::asio::ip::udp::socket socket(io_service,
        boost::asio::ip::udp::v4());
      boost::asio::deadline_timer timer(io_service);
    
      // Process the io_service in the runner thread.
      auto runner = std::thread([&]() {
        boost::asio::io_service::work work(io_service);
        io_service.run();
      });
    
      // Start an operation that will not complete.
      auto bytes_transferred_future = socket.async_receive(
        boost::asio::null_buffers(), boost::asio::use_future);
    
      // Arm the timer.
      timer.expires_from_now(boost::posix_time::seconds(2));
      timer.async_wait([&](const boost::system::error_code&) {
        timer.cancel();
        socket.close();
        io_service.stop();
      });
    
      // bytes_transferred's promise will never be set as the io_service
      // is not running.
      auto bytes_transferred = get(io_service, bytes_transferred_future);
      assert(!bytes_transferred);
    
      runner.join();
    }
    

    【讨论】:

    • 等待wait_for 并检查某些东西是我想避免的无用开销。我曾考虑过这个解决方案,但我试图找到更有效的方法,它可以释放我等待的所有未来,而无需手动工作。这是否可以正确执行(如果错误则不使用reset)?
    • @VictorPolevoy 据我所知,只有 3 个选项具有明确的行为:运行处理程序;销毁处理程序;或定期等待处理程序。在可能永远不会完成的异步操作上无限期地同步阻塞似乎很尴尬。如果使用wait_for()wait_until(),并且提供的持续时间或时间略大于deadline_timer的过期时间,则大大减少了未来多次等待的机会。
    【解决方案3】:

    使工作对截止时间超时可见,并允许截止时间超时将其删除。

    boost::scoped_ptr<boost::asio::io_service::work work;
    

    你仍然可以在这里创建工作,但堆分配它。

    runner_ = std::thread([this]() {
        work = new boost::asio::io_service::work(io_service_);
        io_service_.run();
    });
    

    然后像这样关闭:

    void deadlineTimeout() {
        deadline_.cancel();
        socket_.close(); // socket_ is a socket on io_service_
        work.reset();
    
        // The io_service::run() will exit when there are no active requests 
        // (i.e. no sockeet, no deadline, and no work)
        // so you do not need to call: io_service_.stop();
    }
    

    【讨论】:

    • stop() 的调用可能会产生不良影响。已取消的async_resolve 操作的内部完成处理程序可能永远不会被调用,从而导致std::promise 永远不会设置其值。
    猜你喜欢
    • 2019-03-23
    • 1970-01-01
    • 1970-01-01
    • 2020-07-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-02-19
    • 1970-01-01
    相关资源
    最近更新 更多