【发布时间】:2015-10-15 02:52:04
【问题描述】:
我有一个多线程应用程序,它通过在 boost::asio 中的集成使用 boost::asio 和 boost::coroutine。每个线程都有自己的 io_service 对象。线程之间唯一共享的状态是连接池,当连接从/返回到连接池时,这些连接池被 mutex 锁定。当池中没有足够的连接时,我将无限 asio::steady_tiemer 推送到池的内部结构中并异步等待它,然后我从 couroutine 函数中让步。当其他线程返回到池的连接时,它检查是否有等待计时器,它从内部结构中获取等待计时器,它获取它的 io_service 对象并发布一个 lambda 唤醒计时器以恢复暂停协程。我在应用程序中有随机崩溃。我尝试用 valgrind 调查问题。它发现了一些问题,但我无法理解它们,因为它们发生在 boost::coroutine 和 boost::asio 内部。以下是我的代码和 valgrind 输出的片段。有人可以看到并解释问题吗?
这里是调用代码:
template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
AvlRequestDataList allRequests;
for(auto& requestContext : avlRequestContexts)
{
if(!requestContext.pullProvider || !requestContext.toAskGDS())
continue;
auto& requests = requestContext.pullProvider->getRequestsData();
copy(requests.begin(), requests.end(), back_inserter(allRequests));
}
if(allRequests.size() == 0)
return;
boost::asio::io_service ioService;
curl::AsioMultiplexer multiplexer(ioService);
for(auto& request : allRequests)
{
using namespace boost::asio;
spawn(ioService, [&multiplexer, &request](yield_context yield)
{
request->prepare(multiplexer, yield);
});
}
while(true)
{
try
{
VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
ioService.run();
VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
break;
}
catch(const std::exception& e)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
}
catch(...)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
}
}
}
这是在衍生的 lambda 中调用的 prepare 函数实现:
void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
auto& ioService = multiplexer.getIoService();
_connection = _pool.getConnection(ioService, yield);
_connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);
multiplexer.addEasyHandle(_connection->getHandle(),
[this](const curl::EasyHandleResult& result)
{
if(0 == result.responseCode)
returnQuota();
VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
_pool.addConnection(std::move(_connection));
});
}
void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
try
{
prepareImpl(multiplexer, yield);
}
catch(const std::exception& e)
{
VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
returnQuota();
}
catch(...)
{
VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
returnQuota();
}
}
returnQuota 函数是AvlRequestData 类的纯虚方法,它在我所有测试中使用的TravelportRequestData 类的实现如下:
void returnQuota() const override
{
auto& avlQuotaManager = AvlQuotaManager::getInstance();
avlQuotaManager.consumeQuotaTravelport(-1);
}
这里是连接池的push和pop方法。
auto AvlConnectionPool::getConnection(
TimerPtr timer,
asio::yield_context yield) -> ConnectionPtr
{
lock_guard<mutex> lock(_mutex);
while(_connections.empty())
{
_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());
_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();
}
ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();
VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));
++_connectionsGiven;
return connection;
}
void AvlConnectionPool::addConnection(ConnectionPtr connection,
Side side /* = Back */)
{
lock_guard<mutex> lock(_mutex);
if(Front == side)
_connections.emplace_front(std::move(connection));
else
_connections.emplace_back(std::move(connection));
VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));
if(_timers.empty())
return;
auto timer = _timers.back();
_timers.pop_back();
auto& ioService = timer->get_io_service();
ioService.post([timer](){ timer->cancel(); });
VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
% _connectionPoolName));
}
这是 coroutineAsyncWait 的实现。
inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}
最后是 valgrind 输出的第一部分:
==8189== 线程 41:
==8189== 大小为 8 的读取无效
==8189== 在 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (匿名命名空间)::executeRequests > >(std::vector >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(长) (trampoline_push.hpp:65)
==8189== 地址 0x2e3b5528 没有被堆栈、malloc 或(最近)释放
当我使用带有调试器的 valgrind 时,它会在 boost::coroutine 库中 trampoline_push.hpp 中的以下函数中停止。 p>
53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│ typedef typename Coro::param_type param_type;
57│
58│ BOOST_ASSERT( vp);
59│
60│ param_type * param(
61│ reinterpret_cast< param_type * >( vp) );
62│ BOOST_ASSERT( 0 != param);
63│
64│ Coro * coro(
65├> reinterpret_cast< Coro * >( param->coro) );
66│ BOOST_ASSERT( 0 != coro);
67│
68│ coro->run();
69│ }
【问题讨论】:
-
请发布您的
returnQuota方法正文。 -
粗略一看,
AvlRequestData::prepare()中的全面抑制是可疑的,并且违反了 Boost.Coroutine 要求(请参阅here)。如果你抓住const boost::coroutines::detail::forced_unwind&并重新抛出它,问题是否仍然存在? -
@Tanner Sansbury = 10 倍发现这一点。我添加了
forced_unwind异常的重新抛出,但问题仍然存在。 -
@PSIAlt - 我在问题中添加了
returnQuota函数。 -
我发布了另一个带有概念证明的question,以单独模拟问题。
标签: c++ multithreading boost-asio valgrind boost-coroutine