生产者-消费者问题(Producer-consumer problem),也称作有限缓冲问题(Bounded-buffer problem),是多线程领域的一个经典问题,可以描述为:两个或者更多个线程共享同一个缓冲区,其中一个或多个作为“生产者”会不断地向缓冲区中添加数据,另外的一个或者多个作为“消费者”从缓冲区中取走数据。这个问题的关键在于:要保证缓冲区满了之后“生产者”不能再继续添加数据,而缓冲区空了之后“消费者”不能再取走数据了。
这个问题在多个“生产者”和“消费者”的情况下肯定要麻烦一点,所以先看一下只有一个“生产者”和一个“消费者”以及一个元素缓冲区的情况。这时候情况可以简化为:
- 从缓冲区中取走数据和向缓冲区中添加数据需要互斥操作保持同步,所以,这里需要用临界区或者互斥量来实现;
- 生产者要等待缓冲区“有空间”(由于这里缓冲区只有一个元素,所以等价于要等待缓冲区为空)才能添加数据,同样消费者也不能在缓冲区为空的时候取数据。这两个过程都需要事件或者信号量来通知进行。
考虑好了上述两点要求,就可以设计出如下思路算法:
/* 针对生产者的算法1: */
{ WaitForSingleObject(hEmpty, INFINITE); WaitForSingleObject(hMutex, INIFINITE); /* 生产者的活动 */ ReleaseMutex(hMutex); ReleaseSemaphore(hFull, 1, NULL);
} /* 针对消费者的算法1: */
{ WaitForSingleObject(hFull, INFINITE); WaitForSingleObject(hMutex, INIFINITE); /* 消费者的活动 */ ReleaseMutex(hMutex); ReleaseSemaphore(hEmpty, 1, NULL);
}
当然,生产者和消费者的互斥操作不用hMutex而改用EnterCriticalSection临界区也是一样的。举一个实例看看如何应用生产者-消费者算法:
typedef struct _MESSAGE_QUEUE /* 消息队列的数据结构 */ { int threadId; int msgType[MAX_NUMBER]; int count; HANDLE hFull; HANDLE hEmpty; HANDLE hMutex; }MESSAGE_QUEUE; /* 发送消息,类似于“生产者” */ void send_mseesge(int threadId, MESSAGE_QUEUE* pQueue, int msg) { assert(NULL != pQueue); if(threadId != pQueue->threadId) return; WaitForSingleObject(pQueue->hEmpty, INFINITE); WaitForSingleObject(pQueue->hMutex, INFINITE); pQueue->msgType[pQueue->count ++] = msg; ReleaseMutex(pQueue->hMutex); ReleaseSemaphore(pQueue->hFull, 1, NULL); } /* 接收消息,类似于“消费者” */ void get_message(MESSAGE_QUEUE* pQueue, int* msg) { assert(NULL != pQueue && NULL != msg); WaitForSingleObject(pQueue->hFull, INFINITE); WaitForSingleObject(pQueue->hMutex, INFINITE); *msg = pQueue->msgType[pQueue->count --]; ReleaseMutex(pQueue->hMutex); ReleaseSemaphore(pQueue->hEmpty, 1, NULL); }