【问题标题】:unable to add elements to thread safe locking queue of shared pointers无法将元素添加到共享指针的线程安全锁定队列
【发布时间】:2015-06-02 03:43:04
【问题描述】:

我正在尝试使用 C++11 并发技术创建基于线程间消息的通信。 Anthony William 的“Concurrency in Action”一书描述了此实现所基于的线程安全锁定队列。书中描述的线程安全锁定队列与我想要实现的线程安全锁定队列之间的区别首先是我使用通用引用将队列元素转发到阻塞队列,其次(这就是问题所在)我需要能够存储一个 std::shared_ptr 指针队列,因为模板类型由一个带有抽象基类的简单消息类层次结构和带有实际专用消息的子类组成。我需要使用共享指针来避免数据切片。

编辑:我添加了一个 coliru 演示以更清楚地显示我的问题。 Live Demo

编辑 1:对 coliru 现场演示的更多更新,其中包含额外的编译器错误: Coliru Demo With Compiler Errors

编辑 2:感谢 Alejandro,我有一个可行的解决方案 Working Coliru Example

为此我改变了 Anthony William 对底层消息队列的实现:

std::queue<T> data_queue

到一个

std::queue<std::shared_ptr<T>> data_queue

但是当我尝试通过通用参考完美转发签名将消息指针推送到队列时,我得到了各种各样的错误。

我希望能够在此队列上添加消息的方式如下:

UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);

使用上面的代码,编译器会报错 以下行错误 C2664: 'void UtlThreadSafeQueue::push(T &&)' : 无法转换 参数 1 从 'std::shared_ptr' 到 'BaseMessageType &&' 与 T=BaseMessageType

我想我需要一些方法来专门化指针类型,但我不确定。

我的实现如下:

/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/

#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_

// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>

// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
    // Windows uses min/max macros
    #undef min
    #undef max
#endif

// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS

template<typename T>
class UtlThreadSafeQueue {
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> data_queue;
    std::condition_variable data_cond;
    std::size_t capacity;
    std::atomic<bool> shutdownFlag;
public:
    explicit UtlThreadSafeQueue(const size_t& rCapacity =
        std::numeric_limits<std::size_t>::max())
        : mut()
        , data_queue()
        , data_cond()
        , capacity(rCapacity)
        , shutdownFlag(false)
    {}

    UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
        std::lock_guard<std::mutex> lock(rhs.mut);
        data_queue = rhs.data_queue;
    }

    virtual ~UtlThreadSafeQueue() = default;

    // move aware push
    inline void push(T&& value) {
        std::unique_lock<std::mutex> lock(mut);
        // only add the value on the stack if there is room
        data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
        data_queue.emplace(std::forward<T>(value));
        data_cond.notify_one();
    }

    // wait for non empty lambda condition before returning value
    inline void wait_and_pop(T& rValue) {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty();});
        // ideally should return an invalid value
        if (!shutdownFlag) {
            rValue = data_queue.front();
            data_queue.pop();
        }
    }

    // wait for non empty lambda condition before returning shared pointer to value
    inline std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
        if (shutdownFlag) {
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }
        return nullptr;
    }

    // return value in specified reference and flag indicating whether value
    // successfully returned or not
    inline bool try_pop(T& rValue) {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return false;
        }
        rValue = data_queue.front();
        data_queue.pop();
        return true;
    }

    // return shared pointer to value - which if set to nullptr,
    // indicates container was empty at the time of the call.
    inline std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    // thread safe method to check if the queue is empty
    // note that if it is empty
    inline bool empty() const {
        std::lock_guard<std::mutex> lock(mut);
        return data_queue.empty();
    }

    // shutdown support - wake up potentially sleeping queues
    inline void shutdown() {
        shutdownFlag = true;
        data_cond.notify_all();
    }
};

#endif // _utlThreadSafeQueue_h_

【问题讨论】:

  • mDataLoadSessionQ.push(BaseType); 这里的BaseType 是什么,如果有的话,它与message 有什么关系?反正push需要带shared_ptr&lt;T&gt;,同理,各种口味的pop返回shared_ptr&lt;T&gt;
  • coliru 演示确实阐明了这一点。您尝试使用std::shared_ptr 调用push,但push 期待BaseMessage&amp;&amp;shared_ptr 不能转换成这样的类型
  • @johnco3 ,您放置的两个链接都已失效(尽管其中 1 个已修复)
  • wait_and_pop() 中,您试图通过调用make_shared 用另一个shared_ptr 构造它来创建shared_ptr&lt;BaseMessage&gt;。这有几件事不对劲。 1)如果您已经将shared_ptrs 存储在data_queue 中,为什么要从前一个构造shared_ptr? 2) BaseMessage 没有构造函数采用std::shared_ptr&lt;BaseMessage&gt;&amp;,那么您希望std::make_shared&lt;T&gt;(data_queue.front()) 做什么?
  • 是的,我编辑了问题以修复链接 =)

标签: c++ multithreading c++11 ipc shared-ptr


【解决方案1】:

根据 cmets 中的扩展讨论,并根据 Coliru 链接,我想我理解您最初尝试做的事情,并且我还想为您的数据结构提出一些建议。

  • 您提到您的push() 函数具有移动感知功能。优秀!但是,要小心。

    如果你看看你是如何定义 push 函数的,

    inline void push(T&& value)
    

    我想在这里指出一些事情。第一个是这只会绑定到右值引用和 not 通用引用(或者,forwarding references,因为它们很快就会被调用)。您在push 中使用std::forward 是不合适的做法(尽管在技术上是正确的)。原因是T 类型已经在类级别推导出来(当您实例化UtlThreadSafeQueue 时)。要获得完美转发语义,您需要push,如下所示:

    template<typename U>
    inline void push(U&& value) { ... }
    

    这个版本的push 接受任何类型的引用,正如您所期望的那样。但是,它的用途是将任何参数转发到适当的构造函数/函数/等。由于您希望维护一个内部 std::queue&lt;std::shared_ptr&lt;BaseMessage&gt;&gt; ,您可以有一个 push 接受对派生类型 BaseMessage 的引用(左值或右值),并将 std::shared_ptr&lt;DerivedType&gt; 放入队列中。这将建立一个指向基址的指针关系(std::shared_ptr&lt;BaseMessage&gt; base_ptr = derived_ptr,其中derived_ptr 的类型为std::shared_ptr&lt;DerivedMessage&gt;)。这可以通过以下方式完成:

    template<typename U>
    inline
    std::enable_if_t<std::is_base_of<T,std::decay_t<U>>::value> push(U&& value)   
    {
    
      std::unique_lock<std::mutex> lock(mut);
      // only add the value on the stack if there is room
      data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
      data_queue.emplace(std::make_shared<std::decay_t<U>> (std::forward<U>(value)));
      data_cond.notify_one();
    }
    

    std::enable_if_t 的使用确保只有派生自 BaseMessage 的类型才能传递给 push 函数。 std::make_shared&lt;std::decay_t&lt;U&gt;&gt; (std::forward&lt;U&gt;(value)) 的队列中的位置将调用 std::shared_ptr 的第 9 个构造函数(如列出的 Here )。

    这样做的好处在于,它允许您和您的用户编写如下代码:

    UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ(10);
    StringMessage sm("Hi there!");
    IntegerMessage im(4242);
    dataLoadSessionQ.push(sm);
    dataLoadSessionQ.push(im);
    

    它会按预期运行。每个 Message 都由 lvalue-ref 传入,并通过调用派生类型的 copy-ctor 来生成 shared_ptr

  • 您公开了一个push 接口,该接口不仅接受std::shared_ptr,还接受具有一些微妙之处的std::shared_ptr&amp;&amp;

    乍一看,我似乎无法做到这一点(从您的 Coliru 链接中借用 StringMessageBaseMessage 类型):

      UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
      auto my_message = std::make_shared<StringMessage>("Another message!");
      dataLoadSessionQ.push(my_message);
    

    尽管push 被定义为对shared_ptr 进行右值引用,但此代码通过传递my_message 进行编译(这不是右值引用!)。起初我并没有立即明白原因。但是,事实证明,类似于static_cast,有一个为shared_ptr 定义的static_pointer_cast,如下所示(借用自Here):

    template< class T, class U > 
    std::shared_ptr<T> static_pointer_cast( const std::shared_ptr<U>& r );
    

    如果转换成功,它将执行从shared_ptr&lt;U&gt;shared_ptr&lt;T&gt; 的转换。因为您的 Coliru 示例在内部使用了 std::queue&lt;std::shared_ptr&lt;BaseMessage&gt;&gt;,并且您正在尝试推送 shared_ptr&lt;StringMessage&gt;,所以由于 StringMessage 继承自 BaseMessage隐式 转换为 shared_ptr&lt;BaseMessage&gt; 成功。转换返回一个别名构造std::shared_ptr&lt;BaseMessage&gt;,它将愉快地绑定到右值引用。

    请注意,如果您改为尝试以下操作:

    UtlThreadSafeQueue<BaseMessage> dataLoadSessionQ;
    auto generic_message = std::make_shared<BaseMessage>(); 
    dataLoadSessionQ.push(generic_message);
    

    你得到了我们(或者可能只是我)最初期望的编译器错误

错误:无法将 'std::shared_ptr' 左值绑定到 'std::shared_ptr&&' dataLoadSessionQ.push(generic_message);

老实说,我无法从性能或美学方面找到一个很好的理由必须将shared_ptr&lt;Derived&gt; 传递给UtlThreadSafeQueue&lt;Base&gt;。我希望能够传入 both 一个临时的 Derived 和一个左值,而不会过多地担心队列的内部结构。 您还可以通过std::movedata_queue.front() 中获取返回shared_ptrs 的wait_and_pop()/try_pop() 大写(因为它在下次调用data_queue.pop() 时无论如何都会被破坏)

在您的 UtlThreadSafeQueue 构造函数中,我还将考虑将 const std::size_t&amp; 更改为仅按值 std::size_t 以及(示例)IntegerMessage 类型。

考虑到这一点,如果您对我上面强调的更改提供任何反馈,我将不胜感激 - 坦率地说,直到很久以后当您发布更多示例并继续编辑问题时,我才能理解您的目标/实施。

【讨论】:

  • 我可以使用你上面写的代码,但是我想知道你是否会看一下这个更新的 coliru 并解释为什么我不能将 shared_ptr 转发到 push 方法。 coliru.stacked-crooked.com/a/6d695bf42e72b91c 我对元编程很陌生,我想你可能知道,我有很多代码已经在 UtlThreadSafeQueue 之外构建了这些 shared_ptr 对象,尽管我可以让它们通过在它们前面加上 *T 并使用您的方法来工作,我更愿意完善重载 shared_ptr
  • 嘿@johnco3,你不能将std::shared_ptr 推送到我写的push 是因为enable_if_t 只接受从BaseMessage 派生的参数-@ 987654393@ 显然不是从中派生出来的。我知道您可能有一些设计选择要使用shared_ptrs,所以请尝试使用this demo
  • 我很惊讶该类同时需要 const 和非 const(通用参考)变体。我将其修改为具有单个方法并将 std::move 更改为 std::forward ,我认为这是采用 (std::shared_ptr&& ptr) 参数的通用引用推送方法 - 我认为那会表明它是一个通用引用,这就是为什么我将移动更改为 std::forward 但是编译器很适合! coliru.stacked-crooked.com/a/fca9671bf98ab9f3 有机会看看吗?你能推荐一本关于这个元东西的好书吗,在线资源一直很弱
  • @johnco3 ,老实说,我认为您对通用(转发)引用感到困惑。 Scott Meyers 有一个出色的talk about Universal References in C++11,我认为你会从中受益匪浅。您在push 中写的内容不是 转发引用,它严格 是右值引用(尽管有template&lt;typename U&gt;)。我强烈建议您观看 Scott 的演讲 - 它可能会回答您的问题。
  • 我几天前看过那个视频,但是当 arg 变成指针时,何时可以完美转发简单类型&& 通用引用 args 并不太清楚——他还有另一个谈话涵盖了这一点——但是在他讨论这个问题的时候,音频就被破坏了。它就在facebook.com/Engineering/videos/10151094464083109 末尾的一个名为“专业转发模板”的区域中 感谢您对相对新手如此耐心
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-18
  • 2017-06-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多