【问题标题】:C++ Equivalent to Java's BlockingQueueC++ 等价于 Java 的 BlockingQueue
【发布时间】:2012-10-09 17:13:50
【问题描述】:

我正在将一些 Java 代码移植到 C++,其中一个特定部分使用 BlockingQueue 将消息从多个生产者传递到单个消费者。

如果您不熟悉 Java BlockingQueue 是什么,它只是一个具有硬容量的队列,它向队列中的 put() 和 take() 公开线程安全方法。如果队列已满,put() 阻塞,如果队列为空,则 take() 阻塞。此外,还提供了这些方法的超时敏感版本。

超时与我的用例相关,因此提供这些超时的建议是理想的。如果没有,我可以自己编写一些代码。

我用谷歌搜索并快速浏览了 Boost 库,但没有找到类似的东西。也许我在这里是盲人......但是有人知道一个好的推荐吗?

谢谢!

【问题讨论】:

  • 手工制作的类有一个数组(可能是双端队列而不是数组,以便于 pop_front push_back)和互斥锁?
  • 硬容量真的有要求吗?
  • 就我而言,是的。生产者很可能超过消费者,我需要在生产者端阻塞线程,或者拒绝他们的输入,以免内存不足!

标签: c++ multithreading


【解决方案1】:

它不是固定大小,也不支持超时,但这是我最近使用 C++ 2011 构造发布的队列的简单实现:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

扩展和使用定时等待弹出应该是微不足道的。我没有这样做的主要原因是我对到目前为止所想到的界面选择不满意。

【讨论】:

  • 推送中的范围是否必要?我猜您正在尝试解锁互斥锁...但我不确定 notify_one 的先决条件。
  • push() 中的范围不是必需的,但没有它,条件变量会在锁定仍被持有时发出信号。在发出信号之前释放锁可以使锁随时可用。
  • @javapowered: 鉴于正在使用锁,它并不在乎!可以等待锁定的线程可能存在限制,但我无法想象这会相关。
  • 使用 d_m_ 作为成员变量的前缀是风格问题还是这种选择有一些语义?
  • @Isaac:这是风格问题。在不同的组织中,使用不同的成员变量指标。在不同的组织中,我什么都没使用,m_-prefix、d_-prefix、_-suffix 和 [不明智的]_-prefix。目前,我在一个使用d_-前缀的组织工作。
【解决方案2】:

这是blocking queue with shutdown request 功能的示例:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};

【讨论】:

    【解决方案3】:

    好的,我参加聚会有点晚了,但我认为这更适合 Java 的 BlockingQueue 实现。在这里我也使用一个互斥锁和两个条件来照顾不满不空。 IMO BlockingQueue 在容量有限的情况下更有意义,我在其他答案中没有看到。我也包含了一个简单的测试场景:

    #include <iostream>
    #include <algorithm>
    #include <queue>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    
    template<typename T>
    class blocking_queue {
    private:
        size_t _capacity;
        std::queue<T> _queue;
        std::mutex _mutex;
        std::condition_variable _not_full;
        std::condition_variable _not_empty;
    
    public:
        inline blocking_queue(size_t capacity) : _capacity(capacity) {
            // empty
        }
    
        inline size_t size() const {
            std::unique_lock<std::mutex> lock(_mutex);
            return _queue.size();
        }
    
        inline bool empty() const {
            std::unique_lock<std::mutex> lock(_mutex);
            return _queue.empty();
        }
    
        inline void push(const T& elem) {
            {
                std::unique_lock<std::mutex> lock(_mutex);
    
                // wait while the queue is full
                while (_queue.size() >= _capacity) {
                    _not_full.wait(lock);
                }
                std::cout << "pushing element " << elem << std::endl;
                _queue.push(elem);
            }
            _not_empty.notify_all();
        }
    
        inline void pop() {
            {
                std::unique_lock<std::mutex> lock(_mutex);
    
                // wait while the queue is empty
                while (_queue.size() == 0) {
                    _not_empty.wait(lock);
                }
                std::cout << "popping element " << _queue.front() << std::endl;
                _queue.pop();
            }
            _not_full.notify_one();
        }
    
        inline const T& front() {
            std::unique_lock<std::mutex> lock(_mutex);
    
            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            return _queue.front();
        }
    };
    
    int main() {
        blocking_queue<int> queue(5);
    
        // create producers
        std::vector<std::thread> producers;
        for (int i = 0; i < 10; i++) {
            producers.push_back(std::thread([&queue, i]() {
                queue.push(i);
                // produces too fast
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }));
        }
    
        // create consumers
        std::vector<std::thread> consumers;
        for (int i = 0; i < 10; i++) {
            producers.push_back(std::thread([&queue, i]() {
                queue.pop();
                // consumes too slowly
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            }));
        }
    
        std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
            thread.join();
        });
    
        std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
            thread.join();
        });
    
        return EXIT_SUCCESS;
    }
    

    【讨论】:

    • 尽管 STL 队列也是这样工作的(你必须使用 front() 来访问第一个元素,然后再使用 pop() 来访问它),我不认为这种接口可以工作在多线程场景中很好,因为您无法弹出第一个元素并在一次操作中获取其值。如果你先做front() 然后pop(),你可能会删除一个与你刚刚获得的元素不同的元素。还是我在这里遗漏了什么?
    • 非常正确,它可以很容易地扩展为执行 pop2frontAndPop 将它们融合为一个,然后涵盖需要原子线程安全访问而不是两步法。
    【解决方案4】:

    你应该先写信号量的类

    #ifndef SEMEPHORE_H
    #define SEMEPHORE_H
    #include <mutex>
    #include <condition_variable>
    
    class semephore {
    public:
        semephore(int count = 0)
            : count(count),
              m(),
              cv()
        {
    
        }
    
        void await() {
            std::unique_lock<std::mutex> lk(m);
            --count;
            if (count < 0) {
                cv.wait(lk);
            }
        }
    
        void post() {
            std::unique_lock<std::mutex> lk(m);
            ++count;
            if (count <= 0) {
                cv.notify_all();
            }
        }
        
    private:
        int count;
        std::mutex m;
        std::condition_variable cv;
    };
    
    #endif // SEMEPHORE_H
    
    

    blocked_queue 现在可以使用信号量来处理它了

    #ifndef BLOCKED_QUEUE_H
    #define BLOCKED_QUEUE_H
    #include <list>
    #include "semephore.h"
    
    template <typename T>
    class blocked_queue {
    public:
        blocked_queue(int count) 
            : s_products(),
              s_free_space(count),
              li()
        {
    
        }
    
        void put(const T &t) {
            s_free_space.await();
            li.push_back(t);
            s_products.post();
        }
    
        T take() {
            s_products.await();
            T res = li.front();
            li.pop_front();
            s_free_space.post();
            return res;
        }
    private:
        semephore s_products;
        semephore s_free_space;
        std::list<T> li;
    };
    
    #endif // BLOCKED_QUEUE_H
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2010-10-01
      • 2013-05-26
      • 2010-10-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多