基础
让我们从一个简化的示例开始,并检查相关的 Boost.Asio 部分:
void handle_async_receive(...) { ... }
void print() { ... }
...
boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);
...
io_service.post(&print); // 1
socket.connect(endpoint); // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print); // 4
io_service.run(); // 5
什么是处理程序?
handler 只不过是一个回调。在示例代码中,有 3 个处理程序:
-
print 处理程序 (1)。
-
handle_async_receive 处理程序 (3)。
-
print 处理程序 (4)。
即使相同的print() 函数被使用了两次,每次使用都被认为是创建自己的唯一可识别处理程序。处理程序可以有多种形状和大小,从上述基本函数到更复杂的构造,例如从boost::bind() 和 lambdas 生成的函子。不管复杂程度如何,处理程序仍然只是一个回调。
什么是工作?
工作是 Boost.Asio 被要求代表应用程序代码执行的一些处理。有时 Boost.Asio 可能会在被告知后立即开始一些工作,而有时它可能会等待稍后的时间点完成工作。完成工作后,Boost.Asio 将通过调用提供的 handler 来通知应用程序。
Boost.Asio 保证 handlers 只会在当前调用 run()、run_one()、poll() 或 poll_one() 的线程中运行。这些线程将工作并调用handlers。因此,在上面的示例中,print() 在发布到io_service (1) 时不会被调用。相反,它被添加到io_service 中,并将在稍后的时间点调用。在这种情况下,它在io_service.run() (5) 内。
什么是异步操作?
asynchronous operation 创建工作,Boost.Asio 将调用 处理程序 以在工作完成时通知应用程序。异步操作是通过调用一个名称带有前缀async_ 的函数来创建的。这些函数也称为启动函数。
异步操作可以分解为三个独特的步骤:
- 需要启动或通知相关联的
io_service。 async_receive 操作 (3) 通知 io_service 它需要从套接字异步读取数据,然后 async_receive 立即返回。
- 做实际工作。在这种情况下,当
socket 接收到数据时,会读取字节并将其复制到buffer。实际工作将在以下任一方面完成:
- 启动函数 (3),如果 Boost.Asio 可以确定它不会阻塞。
- 当应用程序显式运行
io_service (5)。
- 调用
handle_async_receiveReadHandler。再一次,处理程序 仅在运行io_service 的线程中调用。因此,无论何时完成工作(3 或 5),都可以保证 handle_async_receive() 只会在 io_service.run() (5) 内被调用。
这三个步骤在时间和空间上的分离称为控制流倒置。这是使异步编程变得困难的复杂性之一。但是,有一些技术可以帮助缓解这种情况,例如使用 coroutines。
io_service.run() 做什么?
当一个线程调用io_service.run(),work 和handlers 将从这个线程中被调用。在上面的例子中,io_service.run() (5) 将阻塞直到:
- 它已从两个
print 处理程序调用并返回,接收操作以成功或失败完成,其handle_async_receive 处理程序已被调用并返回。
-
io_service 已通过 io_service::stop() 显式停止。
- 从处理程序中引发异常。
一种潜在的伪流可以描述如下:
创建 io_service
创建套接字
将打印处理程序添加到 io_service (1)
等待套接字连接 (2)
向io_service添加异步读工作请求(3)
将打印处理程序添加到 io_service (4)
运行 io_service (5)
有工作或处理程序吗?
是的,有 1 个工作和 2 个处理程序
socket有数据吗?不,什么都不做
运行打印处理程序 (1)
有工作或处理程序吗?
是的,有 1 个工作和 1 个处理程序
socket有数据吗?不,什么都不做
运行打印处理程序 (4)
有工作或处理程序吗?
是的,有 1 件作品
socket有数据吗?不,继续等待
-- 套接字接收数据 --
套接字有数据,将其读入缓冲区
将 handle_async_receive 处理程序添加到 io_service
有工作或处理程序吗?
是的,有 1 个处理程序
运行 handle_async_receive 处理程序 (3)
有工作或处理程序吗?
不,将 io_service 设置为已停止并返回
注意当读取完成时,它向io_service 添加了另一个处理程序。这个微妙的细节是异步编程的一个重要特征。它允许将处理程序链接在一起。例如,如果handle_async_receive 没有得到它预期的所有数据,那么它的实现可能会发布另一个异步读取操作,导致io_service 有更多的工作,因此不会从io_service.run() 返回。
请注意,当io_service 已用完时,应用程序必须在再次运行之前reset() io_service。
示例问题和示例 3a 代码
现在,让我们检查问题中引用的两段代码。
问题代码
socket->async_receive 将工作添加到io_service。因此,io_service->run() 将阻塞,直到读取操作以成功或错误完成,并且ClientReceiveEvent 已完成运行或引发异常。
为了更容易理解,这里有一个较小的注释示例 3a:
void CalculateFib(std::size_t n);
int main()
{
boost::asio::io_service io_service;
boost::optional<boost::asio::io_service::work> work = // '. 1
boost::in_place(boost::ref(io_service)); // .'
boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
{ // '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
} // -'
io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'
work = boost::none; // 4
worker_threads.join_all(); // 5
}
在高层次上,程序将创建 2 个线程来处理 io_service 的事件循环 (2)。这会产生一个简单的线程池来计算斐波那契数 (3)。
问题代码与此代码之间的一个主要区别是此代码调用io_service::run() (2) 在 实际工作和处理程序添加到io_service (3)。为了防止 io_service::run() 立即返回,创建了一个 io_service::work 对象 (1)。这个对象可以防止io_service 用完工作;因此,io_service::run() 不会因为没有工作而返回。
整体流程如下:
- 创建
io_service::work 对象并将其添加到io_service。
- 创建的线程池调用
io_service::run()。由于io_service::work 对象,这些工作线程不会从io_service 返回。
- 将 3 个计算斐波那契数的处理程序添加到
io_service,并立即返回。工作线程,而不是主线程,可能会立即开始运行这些处理程序。
- 删除
io_service::work 对象。
- 等待工作线程完成运行。这只会在所有 3 个处理程序都完成执行后才会发生,因为
io_service 既没有处理程序也没有工作。
代码可以以与原始代码相同的方式编写不同的代码,其中将处理程序添加到io_service,然后处理io_service 事件循环。这消除了使用io_service::work 的需要,并产生以下代码:
int main()
{
boost::asio::io_service io_service;
io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'
boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
{ // '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
} // -'
worker_threads.join_all(); // 5
}
同步与异步
虽然问题中的代码使用的是异步操作,但它实际上是同步运行的,因为它正在等待异步操作完成:
socket.async_receive(buffer, handler)
io_service.run();
相当于:
boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);
作为一般经验法则,请尽量避免混合同步和异步操作。很多时候,它可以把一个复杂的系统变成一个复杂的系统。这个answer 突出了异步编程的优点,其中一些也包含在Boost.Asio documentation 中。