【发布时间】:2015-06-02 03:43:04
【问题描述】:
我正在尝试使用 C++11 并发技术创建基于线程间消息的通信。 Anthony William 的“Concurrency in Action”一书描述了此实现所基于的线程安全锁定队列。书中描述的线程安全锁定队列与我想要实现的线程安全锁定队列之间的区别首先是我使用通用引用将队列元素转发到阻塞队列,其次(这就是问题所在)我需要能够存储一个 std::shared_ptr 指针队列,因为模板类型由一个带有抽象基类的简单消息类层次结构和带有实际专用消息的子类组成。我需要使用共享指针来避免数据切片。
编辑:我添加了一个 coliru 演示以更清楚地显示我的问题。 Live Demo
编辑 1:对 coliru 现场演示的更多更新,其中包含额外的编译器错误: Coliru Demo With Compiler Errors
编辑 2:感谢 Alejandro,我有一个可行的解决方案 Working Coliru Example
为此我改变了 Anthony William 对底层消息队列的实现:
std::queue<T> data_queue
到一个
std::queue<std::shared_ptr<T>> data_queue
但是当我尝试通过通用参考完美转发签名将消息指针推送到队列时,我得到了各种各样的错误。
我希望能够在此队列上添加消息的方式如下:
UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);
使用上面的代码,编译器会报错 以下行错误 C2664: 'void UtlThreadSafeQueue::push(T &&)' : 无法转换 参数 1 从 'std::shared_ptr' 到 'BaseMessageType &&' 与 T=BaseMessageType
我想我需要一些方法来专门化指针类型,但我不确定。
我的实现如下:
/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/
#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_
// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>
// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
// Windows uses min/max macros
#undef min
#undef max
#endif
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
template<typename T>
class UtlThreadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
std::size_t capacity;
std::atomic<bool> shutdownFlag;
public:
explicit UtlThreadSafeQueue(const size_t& rCapacity =
std::numeric_limits<std::size_t>::max())
: mut()
, data_queue()
, data_cond()
, capacity(rCapacity)
, shutdownFlag(false)
{}
UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
std::lock_guard<std::mutex> lock(rhs.mut);
data_queue = rhs.data_queue;
}
virtual ~UtlThreadSafeQueue() = default;
// move aware push
inline void push(T&& value) {
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::forward<T>(value));
data_cond.notify_one();
}
// wait for non empty lambda condition before returning value
inline void wait_and_pop(T& rValue) {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty();});
// ideally should return an invalid value
if (!shutdownFlag) {
rValue = data_queue.front();
data_queue.pop();
}
}
// wait for non empty lambda condition before returning shared pointer to value
inline std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
if (shutdownFlag) {
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
return nullptr;
}
// return value in specified reference and flag indicating whether value
// successfully returned or not
inline bool try_pop(T& rValue) {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return false;
}
rValue = data_queue.front();
data_queue.pop();
return true;
}
// return shared pointer to value - which if set to nullptr,
// indicates container was empty at the time of the call.
inline std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// thread safe method to check if the queue is empty
// note that if it is empty
inline bool empty() const {
std::lock_guard<std::mutex> lock(mut);
return data_queue.empty();
}
// shutdown support - wake up potentially sleeping queues
inline void shutdown() {
shutdownFlag = true;
data_cond.notify_all();
}
};
#endif // _utlThreadSafeQueue_h_
【问题讨论】:
-
mDataLoadSessionQ.push(BaseType);这里的BaseType是什么,如果有的话,它与message有什么关系?反正push需要带shared_ptr<T>,同理,各种口味的pop返回shared_ptr<T>。 -
coliru 演示确实阐明了这一点。您尝试使用
std::shared_ptr调用push,但push期待BaseMessage&&。shared_ptr不能转换成这样的类型 -
@johnco3 ,您放置的两个链接都已失效(尽管其中 1 个已修复)
-
在
wait_and_pop()中,您试图通过调用make_shared用另一个shared_ptr构造它来创建shared_ptr<BaseMessage>。这有几件事不对劲。 1)如果您已经将shared_ptrs 存储在data_queue中,为什么要从前一个构造shared_ptr? 2) BaseMessage 没有构造函数采用std::shared_ptr<BaseMessage>&,那么您希望std::make_shared<T>(data_queue.front())做什么? -
是的,我编辑了问题以修复链接 =)
标签: c++ multithreading c++11 ipc shared-ptr