【问题标题】:How to implement blocking read using POSIX threads如何使用 POSIX 线程实现阻塞读取
【发布时间】:2010-09-17 10:29:39
【问题描述】:

我想实现一个生产者/消费者场景,该场景遵循大致如下的接口:

class Consumer {
private:
    vector<char> read(size_t n) {
        // If the internal buffer has `n` elements, then dequeue them
        // Otherwise wait for more data and try again
    }
public:
    void run() {
        read(10);
        read(4839);
        // etc
    }
    void feed(const vector<char> &more) {
        // Safely queue the data
        // Notify `read` that there is now more data
    }
};

在这种情况下,feedrun 将在不同的线程上运行,read 应该是阻塞读取(如 recvfread)。显然,我的双端队列需要某种互斥机制,我需要某种通知系统来通知read 重试。

我听说 条件变量 是要走的路,但我所有的多线程经验都依赖于 Windows,我很难完全理解它们。

感谢您的帮助!

(是的,我知道返回向量是低效的。我们不要讨论这个。)

【问题讨论】:

    标签: c++ multithreading pthreads producer-consumer


    【解决方案1】:

    此代码尚未准备好生产。 不对任何库调用的结果进行错误检查。

    我已将互斥锁的锁定/解锁包装在 LockThread 中,因此它是异常安全的。但仅此而已。

    此外,如果我认真地这样做,我会将互斥锁和条件变量包装在对象中,这样它们就可以在 Consumer 的其他方法中被滥用。但是只要你注意必须在使用条件变量之前(以任何方式)获取锁,那么这种简单的情况就可以保持原样。

    出于兴趣,您检查过 boost 线程库吗?

    #include <iostream>
    #include <vector>
    #include <pthread.h>
    
    class LockThread
    {
        public:
        LockThread(pthread_mutex_t& m)
            :mutex(m)
        {
            pthread_mutex_lock(&mutex);
        }
        ~LockThread()
        {
            pthread_mutex_unlock(&mutex);
        }
        private:
            pthread_mutex_t& mutex;
    };
    class Consumer
    {
        pthread_mutex_t     lock;
        pthread_cond_t      cond;
        std::vector<char>   unreadData;
        public:
        Consumer()
        {
            pthread_mutex_init(&lock,NULL);
            pthread_cond_init(&cond,NULL);
        }
        ~Consumer()
        {
            pthread_cond_destroy(&cond);
            pthread_mutex_destroy(&lock);
        }
    
        private:
            std::vector<char> read(size_t n)
            {
                LockThread  locker(lock);
                while (unreadData.size() < n)
                {
                    // Must wait until we have n char.
                    // This is a while loop because feed may not put enough in.
    
                    // pthread_cond() releases the lock.
                    // Thread will not be allowed to continue until
                    // signal is called and this thread reacquires the lock.
    
                    pthread_cond_wait(&cond,&lock);
    
                    // Once released from the condition you will have re-aquired the lock.
                    // Thus feed() must have exited and released the lock first.
                }
    
                /*
                 * Not sure if this is exactly what you wanted.
                 * But the data is copied out of the thread safe buffer
                 * into something that can be returned.
                 */
                std::vector<char>   result(n); // init result with size n
                std::copy(&unreadData[0],
                          &unreadData[n],
                          &result[0]);
    
                unreadData.erase(unreadData.begin(),
                                 unreadData.begin() + n);
                return (result);
            }
    public:
        void run()
        {
            read(10);
            read(4839);
            // etc
        }
        void feed(const std::vector<char> &more)
        {
            LockThread  locker(lock);
    
            // Once we acquire the lock we can safely modify the buffer.
            std::copy(more.begin(),more.end(),std::back_inserter(unreadData));
    
            // Only signal the thread if you have the lock
            // Otherwise race conditions happen.
            pthread_cond_signal(&cond);
    
            // destructor releases the lock and thus allows read thread to continue.
        }
    };
    
    
    int main()
    {
        Consumer    c;
    }
    

    【讨论】:

    • 这看起来很不错。一个注释(只是一种改进),但大多数网站都说您需要使用互斥锁保护条件变量本身,以防止竞争条件。多线程很有趣,不是吗?
    • 条件变量受互斥体保护。在 read() 和 feed() 这两种情况下,您都必须先获取锁,然后才能对条件变量进行任何操作。
    • 回答你的问题:我没有使用 boost 因为它的大小。每次我想到使用它时,下载大小/我无法阅读 10 层宏/以及带有所有编译器开关的构建系统都会让我感到恶心。太丑了。
    • 如果条件变量没有发生在互斥体内部,则发出信号是可以的,但通常建议不要这样做。只有叫醒订单服务员才很重要(例如,防止饥饿)。通常不是。
    • 你不必构建boost来使用线程库,它都在头文件中
    【解决方案2】:

    我倾向于使用我所说的“同步队列”。我包装普通队列并使用 Semaphore 类来锁定和读取块,就像你想要的那样:

    #ifndef SYNCQUEUE_20061005_H_
    #define SYNCQUEUE_20061005_H_
    
    #include <queue>
    #include "Semaphore.h"
    
    // similar, but slightly simpler interface to std::queue
    // this queue implementation will serialize pushes and pops
    // and block on a pop while empty (as apposed to throwing an exception)
    // it also locks as neccessary on insertion and removal to avoid race 
    // conditions
    
    template <class T, class C = std::deque<T> > class SyncQueue {
    protected:
        std::queue<T, C>    m_Queue;
        Semaphore           m_Semaphore;
        Mutex               m_Mutex;
    
    public:
        typedef typename std::queue<T, C>::value_type value_type;
        typedef typename std::queue<T, C>::size_type size_type;
    
        explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {}
    
        bool empty() const              { return m_Queue.empty(); }
        size_type size() const          { return m_Queue.size(); }
    
        void push(const value_type& x);
        value_type pop();
    };
    
    template <class T, class C>
    void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) {
        // atomically push item
        m_Mutex.lock(); 
        m_Queue.push(x); 
        m_Mutex.unlock(); 
    
        // let blocking semaphore know another item has arrived
        m_Semaphore.v();
    }
    
    template <class T, class C>
    typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() {
        // block until we have at least one item
        m_Semaphore.p();
    
        // atomically read and pop front item
        m_Mutex.lock();
        value_type ret = m_Queue.front();
        m_Queue.pop();
        m_Mutex.unlock();
    
        return ret;
    }
    
    #endif
    

    您可以在线程实现中使用适当的原语来实现信号量和互斥锁。

    注意:此实现是队列中单个元素的示例,但您可以轻松地将其包装为缓冲结果直到提供 N 的函数。如果它是一个字符队列,则类似这样:

    std::vector<char> func(int size) {
        std::vector<char> result;
        while(result.size() != size) {
            result.push_back(my_sync_queue.pop());
        }
        return result;
    }
    

    【讨论】:

      【解决方案3】:

      我会扔掉一些半伪代码。这是我的cmets:

      1)这里有非常大的锁定颗粒。如果您需要更快的访问速度,您将需要重新考虑您的数据结构。 STL 不是线程安全的。

      2) 锁定将阻塞,直到互斥锁允许它通过。互斥体结构是它使用锁定/解锁机制一次允许 1 个线程通过它。不需要轮询或某种异常式结构。

      3) 这是一个在句法上相当老套的问题。我对 API 和 C++ 语法并不精确,但我相信它提供了语义上正确的解决方案。

      4)根据评论进行编辑。

      class piper
      {
      pthread_mutex queuemutex;
      pthread_mutex readymutex;
      bool isReady; //init to false by constructor
      
      //whatever else
      };
      
      piper::read()
      {//whatever
      pthread_mutex_lock(&queuemutex)
      if(myqueue.size() >= n)
      { 
         return_queue_vector.push_back(/* you know what to do here */)
      
          pthread_mutex_lock(&readymutex)
          isReady = false;
          pthread_mutex_unlock(&readymutex)
      }
      pthread_mutex_unlock(&queuemutex)
      }
      
      piper::push_em_in()
      {
      //more whatever
      pthread_mutex_lock(&queuemutex)
      //push push push
      if(myqueue.size() >= n)
      {
          pthread_mutex_lock(&readymutex)
          isReady = true;
          pthread_mutex_unlock(&readymutex)
      }
      pthread_mutex_unlock(&queuemutex)
      }
      

      【讨论】:

      • 好的开始,但请记住,我希望我的阅读成功。不能保证push_em_in 会转储足够的数据以实现这一目标。所以阅读需要等到有足够的时候。这是我要确保有效(非旋转)的循环。
      • 你也可以使用 RAII 来确保你的 lock() unlock() 是异常安全的。
      • @Frank,对这个概念进行了另一次修改。您现在是否在关注如何更好地使用 pthread 互斥锁?
      • 肯定会关注互斥锁的东西。但同样,我需要read 才能成功——它应该是一个阻塞读取。你的仍然可能失败。不过,我感谢您为此付出的努力!希望我能第二次投票给你!
      【解决方案4】:

      只是为了好玩,这是一个使用 Boost 的快速而肮脏的实现。它在支持它的平台上使用 pthreads,在 windows 上使用 windows 操作。

      boost::mutex access;
      boost::condition cond;
      
      // consumer
      data read()
      {
        boost::mutex::scoped_lock lock(access);
        // this blocks until the data is ready
        cond.wait(lock);
      
        // queue is ready
        return data_from_queue();
      }
      
      // producer
      void push(data)
      {
        boost::mutex::scoped_lock lock(access);
        // add data to queue
      
        if (queue_has_enough_data())
          cond.notify_one();  
      }
      

      【讨论】:

      • 只有在有足够数据时才会通知条件,因此不需要循环 - 您应该阅读 boost 线程和条件变量,代码正确且没有死锁
      • 也就是说条件表现不错,在阻塞之前释放锁
      【解决方案5】:

      为了更有趣,这是我的最终版本。没有充分理由的 STL 化。 :-)

      #include <algorithm>
      #include <deque>
      #include <pthread.h>
      
      template<typename T>
      class MultithreadedReader {
          std::deque<T>   buffer;
          pthread_mutex_t moreDataMutex;
          pthread_cond_t  moreDataCond;
      
      protected:
          template<typename OutputIterator>
          void read(size_t count, OutputIterator result) {
              pthread_mutex_lock(&moreDataMutex);
      
              while (buffer.size() < count) {
                  pthread_cond_wait(&moreDataCond, &moreDataMutex);
              }
              std::copy(buffer.begin(), buffer.begin() + count, result);
              buffer.erase(buffer.begin(), buffer.begin() + count);
      
              pthread_mutex_unlock(&moreDataMutex);
          }
      
      public:
          MultithreadedReader() {
              pthread_mutex_init(&moreDataMutex, 0);
              pthread_cond_init(&moreDataCond, 0);
          }
      
          ~MultithreadedReader() {
              pthread_cond_destroy(&moreDataCond);
              pthread_mutex_destroy(&moreDataMutex);
          }
      
          template<typename InputIterator>
          void feed(InputIterator first, InputIterator last) {
              pthread_mutex_lock(&moreDataMutex);
      
              buffer.insert(buffer.end(), first, last);
              pthread_cond_signal(&moreDataCond);
      
              pthread_mutex_unlock(&moreDataMutex);
          }
      };
      

      【讨论】:

      • 这个类被设计为一个基类,它的子类型自己做阅读,只是想被喂食。这是一个流协议,其中的类就像一个小 unix 应用程序。
      【解决方案6】:

      Glib 异步队列在读取您正在寻找的空队列时提供锁定和睡眠。请参阅http://library.gnome.org/devel/glib/2.20/glib-Asynchronous-Queues.html 您可以将它们与 gthread 或 gthread 池结合使用。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-12-17
        • 2017-11-22
        • 2019-05-31
        相关资源
        最近更新 更多