【发布时间】:2012-12-18 02:32:05
【问题描述】:
我正在使用各种 Boost 函数开发一个事件驱动的应用程序。我一直在计划将我的多线程架构基于 ASIO 的 io_service 对象,其中每个工作原子将由一个由 1 个或多个线程组成的线程组分派,每个线程都调用 run()。
此引擎的早期“概念验证”版本使用单个 io_service 对象来调度许多不同的任务,包括截止时间计时器、网络 i/o 和发布的操作。由于这些早期版本有时可能是这样,因此这个早期版本一次调度的事件不超过几个。在确信自己走在正确的轨道上之后,我重构了引擎的一部分以支持更精细的粒度和更高的可扩展性。但是这个新版本正在崩溃,因为io_service 对象中的指针似乎是错误的。
我将尝试针对我遇到的问题开发一个简化且可重现的测试用例。但在此之前,我想确认一下我的架构所基于的假设……
单个 io_service 对象可以在多个网络对象之间共享——tcp 和 udp 解析、套接字、计时器和任何其他需要 io_service 对象引用的野兽。
我问的原因是我无法在文档或在线讨论中找到明确说明。另一个提示我的 io_service 有问题的提示是,我在调用 tcp::socket 的 async_connect() 的下游某处遇到了崩溃,该调用具有有效的端点和处理程序。 async_connect() 实现的最后一行调用this->get_service()。 stream_socket_service get_service() 应该返回的指针最终是 0x2,自 ENIAC 以来,这一直不是一个很好的指针值。
我的环境……
我已尝试使用 Boost 版本 48 到 52 调试此问题。
我正在 OSX 上进行开发,并尝试了从 4.2 到 4.7.3 的各种 gcc 4.x 编译器版本。
在此损坏问题之前的会话中,我之前执行的异步操作包括一些计时器、udp 解析和 tcp 解析。
我正在执行
async_connect()的套接字在调用之前已分配到堆中,并在其构造函数中传递给 io_service。我有一个
io_service::work对象。我还没有使用 strands。
这些信息是否足以让任何人提供帮助,还是我需要提交一段可编译的代码?我也很想了解io_service 服务是什么的入门读物,我相信其他 SO 读者也会。
更新 #1:这是我遇到的问题的最小特征,我已确认仍然崩溃。我在最新的 osx ML 上使用 Boost 1.52.0、gcc 4.6.3 构建它。
#include <stdlib.h>
#include <string>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/asio.hpp>
namespace foo {
namespace asio = boost::asio;
class ios_threads : private boost::noncopyable
{
public:
ios_threads(bool strt = false)
: work_(new asio::io_service::work(ios_))
{
if (strt)
start();
}
static asio::io_service &ios()
{
return ios_;
}
void start()
{
threads_.create_thread(boost::bind(&ios_threads::run, this));
}
void wait()
{
threads_.join_all();
}
private:
void run()
{
while (true) {
try {
ios_.run();
}
catch (std::exception &e) {
delete work_;
break;
}
}
printf("Shutting down.\n");
}
static asio::io_service ios_;
asio::io_service::work *work_;
boost::thread_group threads_;
};
asio::io_service ios_threads::ios_;
struct op;
typedef op * opPtr;
struct op
{
typedef boost::recursive_mutex mutex_type;
op(op *del)
: delegate_(del)
{
}
virtual ~op()
{
}
bool start_async()
{
boost::unique_lock< mutex_type > lock(mutex_);
return start_it();
}
protected:
virtual bool start_it()
{
return false;
}
virtual void did_it(const boost::system::error_code& error)
{
}
void completion_handler(const boost::system::error_code& error)
{
boost::unique_lock< mutex_type > lock(mutex_);
did_it(error);
}
opPtr delegate_;
mutable mutex_type mutex_;
};
struct interface_search : public op
{
typedef op super;
interface_search(op *del)
: super(del),
udp_resolver_(ios_threads::ios())
{
it_ = NULL;
}
bool start_it()
{
try {
std::string hostname = boost::asio::ip::host_name();
asio::ip::udp::resolver::query query(hostname, "", asio::ip::resolver_query_base::numeric_service | boost::asio::ip::resolver_query_base::passive);
udp_resolver_.async_resolve(query, boost::bind(&interface_search::udp_handler, this, asio::placeholders::error, asio::placeholders::iterator));
}
catch (std::exception& e) {
printf("UDP resolve operation failed. Exception: %s", e.what());
}
return super::start_it();
}
protected:
void udp_handler(const boost::system::error_code& error, asio::ip::udp::resolver::iterator it)
{
it_ = ⁢
completion_handler(error);
}
void did_it(const boost::system::error_code& error)
{
if (error == asio::error::operation_aborted)
return;
op *del = delegate_;
if (del)
del->start_async();
}
asio::ip::udp::resolver udp_resolver_;
asio::ip::udp::resolver::iterator *it_;
};
struct google_connect : public op
{
typedef op super;
google_connect()
: super(NULL),
socket_(ios_threads::ios())
{
}
void endpoint(asio::ip::tcp::endpoint &endpoint)
{
endpoint_ = endpoint;
}
bool start_it()
{
try {
// Crashes in the following call!
socket_.async_connect(endpoint_, boost::bind(&google_connect::connect_handler, this, asio::placeholders::error));
}
catch (std::exception& e) {
printf(e.what());
}
return super::start_it();
}
void connect_handler(const boost::system::error_code& error)
{
completion_handler(error);
}
void did_it(const boost::system::error_code& error)
{
if (error == asio::error::operation_aborted)
return;
boost::asio::ip::address addr = socket_.local_endpoint().address();
printf(addr.to_string().c_str());
}
asio::ip::tcp::socket socket_;
asio::ip::tcp::endpoint endpoint_;
};
struct google_resolve : public op
{
typedef op super;
google_resolve()
: super(new google_connect()),
resolver_(ios_threads::ios())
{
it_ = NULL;
}
bool start_it()
{
try {
asio::ip::tcp::resolver::query query(asio::ip::tcp::v4(), "google.com", "http");
resolver_.async_resolve(query, boost::bind(&google_resolve::tcp_handler, this, asio::placeholders::error, asio::placeholders::iterator));
}
catch (std::exception& e) {
printf(e.what());
}
return super::start_it();
}
protected:
void tcp_handler(const boost::system::error_code& error, asio::ip::tcp::resolver::iterator it)
{
it_ = ⁢
completion_handler(error);
}
void did_it(const boost::system::error_code& error)
{
if (error == asio::error::operation_aborted)
return;
asio::ip::tcp::resolver::iterator last;
if (*it_ != last) {
google_connect *gc = static_cast< google_connect * >(delegate_);
if (gc) {
asio::ip::tcp::endpoint ep = **it_;
gc->endpoint(ep);
gc->start_async();
super::did_it(error);
}
}
}
asio::ip::tcp::resolver resolver_;
asio::ip::tcp::resolver::iterator *it_;
};
} // namespace foo
int main(int argc, const char * argv[])
{
try {
foo::ios_threads threads(false);
foo::opPtr ops_;
ops_ = new foo::interface_search(
new foo::google_resolve()
);
ops_->start_async();
threads.start();
threads.wait();
}
catch (std::exception& e) {
printf(e.what());
}
return 0;
}
【问题讨论】:
-
代码可以更好地解释将要发生的事情
-
你的假设是正确的。我猜,你试图访问一个已经被破坏的对象。使用 shared_from_this 成语。
-
如果您无法发布演示问题的代码,请使用valgrind 之类的工具来查找代码中的错误。
-
Igor,我已经说服自己 io_service 和任何其他对象并没有从我手下释放出来,正如我认为我添加的示例代码所示。
-
据我了解 valgrind 在 osx 10.8 下不可靠。
标签: c++ multithreading boost boost-asio