介绍:当开发一个多线程程序时,同步是一个很大的问题。如果你的程序需要数据流包,那么用队列是个好办法。

你可以在 http://www.boost.org/ 发现 boost 库和文档,从它的网站可以看出用boost的优势:

In a word, Productivity. Use of high-quality libraries like Boost speeds initial development, results in fewer bugs, reduces reinvention-of-the-wheel, and cuts long-term maintenance costs. And since Boost libraries tend to become de facto or de jure standards, many programmers are already familiar with them.

下面介绍用boost synchronization class(boost 同步类)来实现。

代码实现:

在例子中,用了线程同步模型来说明producer-consumer(生产者--消费者模型),producer线程创建数据并插入到队列中,consumer线程使用数据并从队列中删除数据。使用了mutex对象来保持两个线程的同步。

用不同的解决方法来实现线程的同步队列,然后比较了它们的优势与不足。

  1. SynchronizedDequeue: is a double-ended queue, implemented with STL deque.
  2. SychronizedVector: is a ring or cycle queue, implemented with STL vector.
  3. SychronizedVectorNB: is the no-blocking version of SychronizedVector.

头文件和接口定义:

#include <iostream>
#include <deque>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

using namespace std;
 
#define default_packesize 1280  

class TPacket
{
    int my_size;
    unsigned char my_databuf[default_packesize];
    unsigned int ID;
public:
    TPacket() {std::memset(my_databuf,0,sizeof(my_databuf));my_size=0;}
    ~TPacket() {;}
    int GetSize() {return my_size;}
    void SetSize(int size) {my_size = size;}
    unsigned int GetID() {return ID;}
    void SetID(int id) {ID = id;}
    bool GetData(char* pbuf,int& size) 
    {
        if(my_size>size)
            return false;
        size = my_size;
        memcpy(pbuf,my_databuf,my_size);
        return true;
    }
    bool SetData(char* pbuf,int size) 
    {
        if(size>default_packesize)
            return false;
        memcpy(my_databuf,pbuf,size);
        my_size=size;
        return true;
    }
public:    
    virtual bool IsValid() {return false;}
    virtual bool Encode() {return false;}
    virtual bool Decode() {return false;}
};

//queue interface
template <class T>
class ISynchronizedQueue
{
public:
    virtual bool add(T pkt) = 0;
    virtual bool get(T& pkt) = 0;
    virtual bool read(T& pkt) = 0;
    virtual bool del(T& pkt) = 0;
    virtual bool clear() = 0;
};

接口实现:

SynchronizedDequeue有动态的队列大小,好处是如果producer比consumer快,没有数据会丢失,全部的数据将被consumer接收。不足是受内存更大的影响。当要插入队列时分配内存,当consumer线程接收到数据后释放内存。因为会出现内存分配和释放多次,降低了对在同一过程中更大的内存回收。

public ISynchronizedQueue<TPacket>
{
    boost::mutex m_mutex;
    deque<TPacket> m_queue;
    boost::condition_variable m_cond;

public:
    bool add(TPacket pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if(m_queue.size()>100)
            m_queue.clear();
        m_queue.push_back(pkt);
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if (!m_queue.size())
        {
            return false;
        }
        pkt = m_queue.front();
        m_queue.pop_front();
        return true;
    }

    bool read(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if (!m_queue.size())
        {
            return false;
        }
        pkt = m_queue.front();
        return true;
    }

    bool del(TPacket& pkt)
    {
        return get(pkt);
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        m_queue.clear();
        return true;
    }
};

 

SychronizedVector使用了固定大小的队列来避免内存开销,但当有新数据来,它会覆盖旧数据,并从队列中刷新出去。

public ISynchronizedQueue<TPacket>
{
    int queue_size;
    boost::mutex m_mutex;
    std::vector<TPacket> my_vector;
    int start,end;

public:
    SynchronizedVector(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
    bool add(TPacket pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        my_vector[end++] = pkt;
        if(end>=queue_size)
            end = 0;
        if(end == start)
            start = end+1;
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        if(start==end)
            return false;
        pkt = my_vector[start++];
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool read(TPacket& pkt)  //not support
    {
        return false;
    }
    bool del(TPacket& pkt) //not support
    {
        return false;
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        start = end =0;
        return true;
    }
};

 

SychronizedVectorNB不会被阻塞,无论是生产者还是消费者线程。优点在于,如果有一些其他activity需要被处理在队列访问线程的过程中,那么non-block将保证响应时间。当线程试图拥有该mutex对象,上述两个队列可以阻塞线程。如果一个线程拥有mutex,那么发生exception时,其他线程也将被阻塞。缺点是,当它不能拥有lock,添加数据到队列时可能失败,那么caller需要再次添加相同的数据。

public ISynchronizedQueue<TPacket>
{
    int queue_size;
    boost::mutex m_mutex;
    std::vector<TPacket> my_vector;
    int start,end;

public:
    SynchronizedVectorNB(int q_size=100) {queue_size = q_size; start=end=0; my_vector.assign(queue_size,TPacket());}
    bool add(TPacket pkt)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
        if(!lock.owns_lock())
            return false;
        my_vector[end++] = pkt;
        if(end>=queue_size)
            end = 0;
        if(end == start)
            start = end+1;
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool get(TPacket& pkt)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex,boost::try_to_lock_t());
        if(!lock.owns_lock())
            return false;

        if(start==end)
            return false;
        pkt = my_vector[start++];
        if(start>=queue_size)
            start = 0;
        return true;
    }
    bool read(TPacket& pkt) //not support
    {
        return false;
    }
    bool del(TPacket& pkt) //not support
    {
        return false;
    }

    bool clear()
    {
        boost::lock_guard<boost::mutex> lock(m_mutex);
        start = end =0;
        return true;
    }
};

 

下面是producer线程代码:

DWORD WINAPI ProducerServerThread(LPVOID lpParam)
{
 int count=0;
 
 ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
 TPacket pkt; 
 LOG("\n-------------------------Producer thread begin-----------------------");
 while(1)
 {
  DWORD t1 = GetTickCount();
  Sleep(50);
 
  if(count++>=1000)
   break;
  
  //initialize packet data to zero.
  memset(&pkt,0,sizeof(pkt));

  //add content to packet, I only set the ID here, you can do something more.
  pkt.SetID(count);


  if(pQ->add(pkt))
   LOG("Add PACKET ID = %d ",pkt.GetID());
  else
   LOG("Add Packet Failed");
  DWORD t2 = GetTickCount();
 
  LOG("ONE-LOOP DURATION = %d",t2-t1);
 }
 LOG("\n-------------------------Producer thread end-----------------------");
 return 0;
}

下面是consumer线程代码:
DWORD WINAPI ConsumerServerThread(LPVOID lpParam)
{
 int count=0;
 ISynchronizedQueue<TPacket>* pQ = (ISynchronizedQueue<TPacket>*)lpParam;
 TPacket pkt;
 LOG("\n-------------------------Cosumer thread begin-----------------------");
 while(1)
 {
  Sleep(10);
 
  if(count++>=1200)
   break;
 
  if(pQ->get(pkt))
   LOG("Get Packet ID = %d",pkt.GetID());
  else
   LOG("Get Packet Failed");
 }
 LOG("\n-------------------------Cosumer thread end-----------------------");
 return 0;
}


下面是main线程代码:

SynchronizedDequeue m_q[5];
//SynchronizedVector m_q[5];
//SynchronizedVectorNB m_q[5]

int _tmain(int argc, _TCHAR* argv[])
{
   int thread_count =5;
   HANDLE server_threads[10]; 
 
   for (int i=0; i < thread_count ;i++)
    {
        server_threads[i] = CreateThread(
                                        NULL,
                                        0,
                                        ProducerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
    for (int i= 0; i < thread_count ;i++)
    {
        server_threads[i+thread_count] = CreateThread(
                                        NULL,
                                        0,
                                        ConsumerServerThread,
                                        &m_q[i],
                                        0,
                                        NULL
                                        );
        if (server_threads[i] == NULL)
        {
            LOG( "Create Thread failed: %d\n", GetLastError());
            return 0;
        }
    }
 
  // Wait until the threads exit, then cleanup
    int retval = WaitForMultipleObjects(
                                   2*thread_count,
                                   server_threads,
                                   TRUE,
                                   INFINITE
                                   );
    if ((retval == WAIT_FAILED) || (retval == WAIT_TIMEOUT))
    {
        LOG( "WaitForMultipleObjects failed: %d\n", GetLastError());
        return 0;
    }
}

在测试代码中,创建了五个producers,五个consumers和五个队列。每个producer都有其伙伴consumer通过使用相同的队列链接。可以验证,如果创建的每个数据包的数据,都是consumer线程通过它的数据包ID处理的。

原英文链接:http://www.codeproject.com/Articles/442452/Thread-Synchronization-Queue-with-Boost

相关文章: