对这类生产者-消费者问题的一个简单而可靠的解决方案是使用由互斥锁保护的消息的单链表。使用 C99 和 pthreads:
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>
struct message {
struct message *next;
/* Payload is irrelevant, here just as an example: */
size_t size;
char data[];
};
typedef struct {
pthread_mutex_t lock;
pthread_cond_t more;
struct message *newest;
struct message *oldest;
} queue;
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL }
/* Nonblocking variant */
struct message *try_dequeue(queue *const q);
/* Blocking variants */
int enqueue(queue *const q, struct message *const m);
struct message *dequeue(queue *const q);
/* Suggested interface for queueing a new message */
int queue_printf(queue *const q, const char *const format, ...);
实现很简单。
-
struct message 单链表首先包含最旧的消息,新消息附加到末尾。
- 一个空队列同时具有
newest == NULL 和oldest == NULL。
- 所有队列操作(
enqueue()、dequeue()、try_dequeue())在检查指针之前先获取队列lock mutex。 (为了减少大量使用中的争用,保持锁定持续时间尽可能短;换句话说,在获取锁定之前先完全构造消息。)
- 阻塞的出队调用可以通过等待
more 条件变量来等待新消息(当队列为空时)。
- 当第一条消息入队时,
newest 和 oldest 都指向它。
- 在对第一条消息进行排队时,会发出条件变量
more 的信号,以防出现等待新消息的阻塞出队。
- 将更多消息排入队列首先将
newest->next 设置为指向新消息,然后将newest 设置为指向新消息。
- 出队将
oldest 成员从列表中分离出来,将oldest 更新为指向oldest->next。如果oldest 变为NULL(那么newest 和oldest 都指向同一个消息,队列中唯一的消息),由于队列为空,newest 也被设置为NULL。李>
- 只有在锁定
lock 互斥锁失败(通常只有在 C 库检测到死锁情况时才会失败),或者如果您检查发现队列结构处于不一致状态(例如,一个,但不是两者,newest 和 oldest 是 NULL,例如)。
上述原型中的逻辑是,如果成功则返回0,否则返回errno 错误代码(EINVAL、EDEADLK)。我也喜欢将 errno 设置为该错误代码,以便与出队对称。
- 消息出列失败的原因与入队相同,而且当队列为空时 (
EWOULDBLOCK/EAGAIN)。在这些情况下,该函数可以返回 NULL 并设置 errno。
如您所见,入队和出队都是 O(1) 操作,即花费恒定时间;无需在任何时候遍历整个列表。
一个入队/出队操作可以一次入队/出队多个消息,只需重复上述操作即可。但是,在实践中很少需要这样做。 (对于出队,主要原因是,如果您一次抓取多条消息,并且一条消息出现故障或错误,则您还必须处理错误以及尚未处理但已出队的消息;错误-容易。更容易一件一件地做事。此外,如果消息顺序不重要,你总是可以让多个消费者并行工作,如果他们一个一个地出队消息。)
进阶笔记:
如果依赖 C99 标准,您可以对以 struct message *next; 开头的任何结构类型使用相同的代码。根据 C99 规则,这样的结构是兼容的(对于共享的初始部分),这是队列操作访问的唯一部分。
换句话说,如果您有多种消息类型,每个消息类型都存储在自己的队列中,那么对于所有不同的消息类型,您只需要一个enqueue()/dequeue()/try_dequeue() 实现,只要消息结构都以struct message *next;开头。
为了类型安全,您需要简单的包装函数:
static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m)
{
return enqueue((queue *const)q, (struct message *const)m);
}
static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q)
{
return dequeue((queue *const)q);
}
static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q)
{
return try_dequeue((queue *const)q);
}
当在头文件中定义时,实际上不应该产生任何开销——事实上,不应该产生任何额外的代码,除非你出于某种原因获取一个地址(其中case 一个非内联版本必须在每个编译单元中发出一个地址)。但是,它们确实在编译时提供了类型检查,这通常很有用。