【问题标题】:How do I handle fork() correctly with boost::asio in a multithreaded program?如何在多线程程序中使用 boost::asio 正确处理 fork()?
【发布时间】:2014-02-03 14:12:04
【问题描述】:

我在掌握如何正确处理从以多线程方式使用 Boost Asio 的多线程程序创建子进程时遇到了一些麻烦。

如果我理解正确,在 Unix 世界中启动子进程的方法是调用fork(),然后调用exec*()。另外,如果我理解正确,调用fork() 将复制所有文件描述符等等,这些需要在子进程中关闭,除非标记为FD_CLOEXEC(从而在调用时自动关闭exec*())。

Boost Asio 需要在调用fork() 时得到通知,以便通过调用notify_fork() 正确操作。但是,在多线程程序中,这会产生几个问题:

  1. 如果我理解正确,套接字默认由子进程继承。它们可以设置为SOCK_CLOEXEC - 但不是直接在创建时*,因此如果从另一个线程创建子进程,则会导致一个计时窗口。

  2. notify_fork() 要求没有其他线程调用任何其他io_service 函数,也没有io_service 关联的任何其他 I/O 对象上的任何函数时间>。这似乎并不可行——毕竟程序是多线程的。

  3. 如果我理解正确,在fork()exec*() 之间进行的任何函数调用都必须是异步信号安全的(请参阅fork() documentation)。没有关于 notify_fork() 调用是异步信号安全的文档。事实上,如果我查看 Boost Asio 的源代码(至少在 1.54 版中),可能会调用 pthread_mutex_lock,如果我理解正确的话,这是 not 异步信号安全的(参见 @ 987654324@,还有其他不在白名单上的呼叫)。

问题 #1 我可以通过分离子进程和套接字 + 文件的创建来解决,这样我可以确保在创建套接字和设置 SOCK_CLOEXEC 之间的窗口中没有创建子进程。问题 #2 更棘手,我可能需要确保 所有 asio 处理程序线程已停止,执行分叉,然后再次重新创建它们,这充其量是潮,最坏的情况是真的很糟糕(我的待定计时器呢??)。问题 #3 似乎完全不可能正确使用它。

如何在多线程程序中正确使用 Boost Asio 以及 fork() + exec*() ...还是我“分叉”了?

如果我误解了任何基本概念,请告诉我(我是在 Windows 编程中长大的,而不是 *nix...)。

编辑: * - 实际上,可以在 Linux 上直接设置 SOCK_CLOEXEC 创建套接字,从 2.6.27 开始可用(参见 socket() documentation)。在 Windows 上,对应的标志 WSA_FLAG_NO_HANDLE_INHERIT 自 Windows 7 SP 1 / Windows Server 2008 R2 SP 1 起可用(请参阅WSASocket() documentation)。 OS X 似乎不支持这一点。

【问题讨论】:

  • 不清楚您是否只想fork 克隆您的父进程,或者fork + exec 另一个可执行文件。
  • @MaximYegorushkin:我想做fork + exec。试图在所述问题中使其更清楚。

标签: c++ linux macos fork boost-asio


【解决方案1】:

在多线程程序中,io_service::notify_fork() 在子进程中调用是不安全的。然而,Boost.Asio 期望它基于fork() support 被调用,因为这是当子进程关闭父进程之前的内部文件描述符并创建新的时。虽然 Boost.Asio 明确列出了调用 io_service::notify_fork() 的先决条件,保证其内部组件在 fork() 期间的状态,但简要查看 implementation 表明 std::vector::push_back() 可以从空闲存储中分配内存,并且不保证分配是异步信号安全的。

话虽如此,一个可能值得考虑的解决方案是fork() 仍然是单线程的进程。当父进程通过进程间通信告知子进程这样做时,子进程将保持单线程并执行fork()exec()。这种分离通过消除在执行fork()exec() 时管理多个线程状态的需要而简化了问题。


这里有一个完整的例子来演示这种方法,其中多线程服务器将通过 UDP 接收文件名,子进程将执行 fork()exec() 以在文件名上运行 /usr/bin/touch。为了使示例更易读,我选择使用stackful coroutines

#include <unistd.h> // execl, fork
#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

/// @brief launcher receives a command from inter-process communication,
///        and will then fork, allowing the child process to return to
///        the caller.
class launcher
{
public:
  launcher(boost::asio::io_service& io_service,
           boost::asio::local::datagram_protocol::socket& socket,
           std::string& command)
    : io_service_(io_service),
      socket_(socket),
      command_(command)
  {}

  void operator()(boost::asio::yield_context yield)
  {
    std::vector<char> buffer;
    while (command_.empty())
    {
      // Wait for server to write data.
      std::cout << "launcher is waiting for data" << std::endl;
      socket_.async_receive(boost::asio::null_buffers(), yield);

      // Resize buffer and read all data.
      buffer.resize(socket_.available());
      socket_.receive(boost::asio::buffer(buffer));

      io_service_.notify_fork(boost::asio::io_service::fork_prepare);
      if (fork() == 0) // child
      {
        io_service_.notify_fork(boost::asio::io_service::fork_child);
        command_.assign(buffer.begin(), buffer.end());
      }
      else // parent
      {
        io_service_.notify_fork(boost::asio::io_service::fork_parent);
      }
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::local::datagram_protocol::socket& socket_;
  std::string& command_;
};

using boost::asio::ip::udp;

/// @brief server reads filenames from UDP and then uses
///        inter-process communication to delegate forking and exec
///        to the child launcher process.
class server
{
public:
  server(boost::asio::io_service& io_service,
         boost::asio::local::datagram_protocol::socket& socket,
          short port)
    : io_service_(io_service),
      launcher_socket_(socket),
      socket_(boost::make_shared<udp::socket>(
        boost::ref(io_service), udp::endpoint(udp::v4(), port)))
  {}

  void operator()(boost::asio::yield_context yield)
  {
    udp::endpoint sender_endpoint;
    std::vector<char> buffer;
    for (;;)
    {
      std::cout << "server is waiting for data" << std::endl;
      // Wait for data to become available.
      socket_->async_receive_from(boost::asio::null_buffers(),
          sender_endpoint, yield);

      // Resize buffer and read all data.
      buffer.resize(socket_->available());
      socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint);
      std::cout << "server got data: ";
      std::cout.write(&buffer[0], buffer.size());
      std::cout << std::endl;

      // Write filename to launcher.
      launcher_socket_.async_send(boost::asio::buffer(buffer), yield);
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::local::datagram_protocol::socket& launcher_socket_;

  // To be used as a coroutine, server must be copyable, so make socket_
  // copyable.
  boost::shared_ptr<udp::socket> socket_;
};

int main(int argc, char* argv[])
{
  std::string filename;

  // Try/catch provides exception handling, but also allows for the lifetime
  // of the io_service and its IO objects to be controlled.
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: <port>\n";
      return 1;
    }

    boost::thread_group threads;
    boost::asio::io_service io_service;

    // Create two connected sockets for inter-process communication.
    boost::asio::local::datagram_protocol::socket parent_socket(io_service);
    boost::asio::local::datagram_protocol::socket child_socket(io_service);
    boost::asio::local::connect_pair(parent_socket, child_socket);

    io_service.notify_fork(boost::asio::io_service::fork_prepare);
    if (fork() == 0) // child
    {
      io_service.notify_fork(boost::asio::io_service::fork_child);
      parent_socket.close();
      boost::asio::spawn(io_service,
          launcher(io_service, child_socket, filename));
    }
    else // parent
    {
      io_service.notify_fork(boost::asio::io_service::fork_parent);
      child_socket.close();
      boost::asio::spawn(io_service, 
          server(io_service, parent_socket, std::atoi(argv[1])));

      // Spawn additional threads.
      for (std::size_t i = 0; i < 3; ++i)
      {
        threads.create_thread(
          boost::bind(&boost::asio::io_service::run, &io_service));
      }
    }

    io_service.run();
    threads.join_all();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  // Now that the io_service and IO objects have been destroyed, all internal
  // Boost.Asio file descriptors have been closed, so the execl should be
  // in a clean state.  If the filename has been set, then exec touch.
  if (!filename.empty())
  {
    std::cout << "creating file: " << filename << std::endl;
    execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0));
  }
}

1号航站楼:

$ ls
a.out 示例.cpp
$ ./a.out 12345
服务器正在等待数据
启动器正在等待数据
服务器得到数据:a
服务器正在等待数据
启动器正在等待数据
创建文件:a
服务器得到数据:b
服务器正在等待数据
启动器正在等待数据
创建文件:b
服务器得到数据:c
服务器正在等待数据
启动器正在等待数据
创建文件:c
ctrl + c
$ ls
a a.out b c example.cpp

2号航站楼:

$ nc -u 127.0.0.1 12345
actrl + dbctrl + dcctrl + d

【讨论】:

  • 这似乎是唯一合理的方法。
  • 您好,刚刚碰到这个:io_service.run();将在父子进程中执行,这是故意的吗?
【解决方案2】:

考虑以下几点:

  • fork() 在子进程中只创建一个线程。您需要重新创建其他线程。
  • 父进程中其他线程持有的互斥锁在子进程中永远锁定,因为拥有的线程无法生存fork()。使用pthread_atfork() 注册的回调可以释放互斥锁,但大多数库从不使用pthread_atfork()。换句话说,您的子进程在调用 malloc()new 时可能会永远挂起,因为标准堆分配器确实使用互斥锁。

鉴于上述情况,多线程进程中唯一可靠的选择是调用fork(),然后调用exec()

请注意,只要不使用pthread_atfork() 处理程序,您的父进程就不会受到fork() 的影响。


关于分叉和boost::asio,有一个io_service::notify_fork() 函数需要在父分叉之前和父子分叉之后调用。它的作用最终取决于所使用的反应器。对于 Linux/UNIX 反应器select_reactorepoll_reactordev_poll_reactorkqueue_reactor,此函数在 fork 之前或之后对父级没有任何作用,但在子级中它重新创建反应器状态并重新注册文件描述符.不过,我不确定它在 Windows 上的作用。

它的用法示例可以在process_per_connection.cpp中找到,你可以复制它:

void handle_accept(const boost::system::error_code& ec)
{
  if (!ec)
  {
    // Inform the io_service that we are about to fork. The io_service cleans
    // up any internal resources, such as threads, that may interfere with
    // forking.
    io_service_.notify_fork(boost::asio::io_service::fork_prepare);

    if (fork() == 0)
    {
      // Inform the io_service that the fork is finished and that this is the
      // child process. The io_service uses this opportunity to create any
      // internal file descriptors that must be private to the new process.
      io_service_.notify_fork(boost::asio::io_service::fork_child);

      // The child won't be accepting new connections, so we can close the
      // acceptor. It remains open in the parent.
      acceptor_.close();

      // The child process is not interested in processing the SIGCHLD signal.
      signal_.cancel();

      start_read();
    }
    else
    {
      // Inform the io_service that the fork is finished (or failed) and that
      // this is the parent process. The io_service uses this opportunity to
      // recreate any internal resources that were cleaned up during
      // preparation for the fork.
      io_service_.notify_fork(boost::asio::io_service::fork_parent);

      socket_.close();
      start_accept();
    }
  }
  else
  {
    std::cerr << "Accept error: " << ec.message() << std::endl;
    start_accept();
  }
}

【讨论】:

  • 是的,我读到fork 的唯一原因是调用exec,这就是我想要做的。但是,在多线程程序中使用 Boost Asio 时,我并不真正理解执行此操作的正确方法。如果我理解正确,子进程中互斥锁的状态并不重要,因为我将使用exec 替换程序。我更关心父进程的状态和那里的io_service + I/O 对象的状态。
  • 所以,也许你有更多关于这个主题的信息。就目前而言,答案并没有真正解决这个问题 - 如何我如何以正确的方式调用fork + exec
  • @villintehaspam 只要不使用pthread_atfork() 处理程序,您的父进程就不会受到影响。
  • 您是说完全避免调用notify_fork() 是安全的吗?由于Boost Asio在内部使用epoll/kqueue,难道这些文件描述符没有需要在子进程中关闭吗?否则,它们在子进程中的继续存在不会影响父进程吗?
  • @villintehaspam 只需关闭exec() 之前的所有文件描述符。即使它们没有关闭,也不会影响父进程,因为您的子进程不太可能检查和查看它可能从父进程派生的每个可能的文件描述符。
猜你喜欢
  • 1970-01-01
  • 2012-12-27
  • 1970-01-01
  • 1970-01-01
  • 2014-08-03
  • 1970-01-01
  • 1970-01-01
  • 2010-12-19
  • 1970-01-01
相关资源
最近更新 更多