【问题标题】:Communication between two pthreads两个 pthread 之间的通信
【发布时间】:2015-08-20 09:54:58
【问题描述】:

在 C 程序中,一些线程(pthread1pthread2、...)生成一条消息,并且生成的消息由另一个线程(pthreadprint)处理,该线程将它们打印出来。

消息在被处理之前可以“堆积”:根据pthreadprint 打印所需的时间,在某个时间可以堆叠多个消息作为pthreadprint 的输入。我显然不知道在最坏的情况下可以等待处理的最大消息数。

将这些消息(从“产生”它们的pthread1pthread2……)发送到打印它们的pthreadprint 的最佳方式是什么?它们不仅应该被传输,而且应该被“储存”。我知道互斥锁和条件变量,但它们不代表队列:是否可以使用所有线程都可以访问的 FIFO 队列?

【问题讨论】:

  • "是否可以使用所有线程都可以访问的 FIFO 队列" 是的。
  • "... mutexes [...] 变量,但它们在这里似乎没那么有用" 确定它们是,即保护打印机线程的 FIFO 缓冲区免受并发访问。
  • 请参阅man queue 和/或en.wikipedia.org/wiki/…

标签: c multithreading queue pthreads fifo


【解决方案1】:

对这类生产者-消费者问题的一个简单而可靠的解决方案是使用由互斥锁保护的消息的单链表。使用 C99 和 pthreads:

#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>

struct message {
    struct message *next;
    /* Payload is irrelevant, here just as an example: */
    size_t          size;
    char            data[];
};

typedef struct {
    pthread_mutex_t lock;
    pthread_cond_t  more;
    struct message *newest;
    struct message *oldest;
} queue;
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL }

/* Nonblocking variant */
struct message *try_dequeue(queue *const q);

/* Blocking variants */
int enqueue(queue *const q, struct message *const m);
struct message *dequeue(queue *const q);

/* Suggested interface for queueing a new message */
int queue_printf(queue *const q, const char *const format, ...);

实现很简单。

  • struct message 单链表首先包含最旧的消息,新消息附加到末尾。
  • 一个空队列同时具有newest == NULLoldest == NULL
  • 所有队列操作(enqueue()dequeue()try_dequeue())在检查指针之前先获取队列lock mutex。 (为了减少大量使用中的争用,保持锁定持续时间尽可能短;换句话说,在获取锁定之前先完全构造消息。)
  • 阻塞的出队调用可以通过等待more 条件变量来等待新消息(当队列为空时)。
  • 当第一条消息入队时,newestoldest 都指向它。
  • 在对第一条消息进行排队时,会发出条件变量more 的信号,以防出现等待新消息的阻塞出队。
  • 将更多消息排入队列首先将newest-&gt;next 设置为指向新消息,然后将newest 设置为指向新消息。
  • 出队将oldest 成员从列表中分离出来,将oldest 更新为指向oldest-&gt;next。如果oldest 变为NULL(那么newestoldest 都指向同一个消息,队列中唯一的消息),由于队列为空,newest 也被设置为NULL。李>
  • 只有在锁定 lock 互斥锁失败(通常只有在 C 库检测到死锁情况时才会失败),或者如果您检查发现队列结构处于不一致状态(例如,一个,但不是两者,newestoldestNULL,例如)。 上述原型中的逻辑是,如果成功则返回0,否则返回errno 错误代码(EINVALEDEADLK)。我也喜欢将 errno 设置为该错误代码,以便与出队对称。
  • 消息出列失败的原因与入队相同,而且当队列为空时 (EWOULDBLOCK/EAGAIN)。在这些情况下,该函数可以返回 NULL 并设置 errno

如您所见,入队和出队都是 O(1) 操作,即花费恒定时间;无需在任何时候遍历整个列表。

一个入队/出队操作可以一次入队/出队多个消息,只需重复上述操作即可。但是,在实践中很少需要这样做。 (对于出队,主要原因是,如果您一次抓取多条消息,并且一条消息出现故障或错误,则您还必须处理错误以及尚未处理但已出队的消息;错误-容易。更容易一件一件地做事。此外,如果消息顺序不重要,你总是可以让多个消费者并行工作,如果他们一个一个地出队消息。)


进阶笔记:

如果依赖 C99 标准,您可以对以 struct message *next; 开头的任何结构类型使用相同的代码。根据 C99 规则,这样的结构是兼容的(对于共享的初始部分),这是队列操作访问的唯一部分。

换句话说,如果您有多种消息类型,每个消息类型都存储在自己的队列中,那么对于所有不同的消息类型,您只需要一个enqueue()/dequeue()/try_dequeue() 实现,只要消息结构都以struct message *next;开头。

为了类型安全,您需要简单的包装函数:

static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m)
{
    return enqueue((queue *const)q, (struct message *const)m);
}

static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q)
{
    return dequeue((queue *const)q);
}

static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q)
{
    return try_dequeue((queue *const)q);
}

当在头文件中定义时,实际上不应该产生任何开销——事实上,不应该产生任何额外的代码,除非你出于某种原因获取一个地址(其中case 一个非内联版本必须在每个编译单元中发出一个地址)。但是,它们确实在编译时提供了类型检查,这通常很有用。

【讨论】:

    【解决方案2】:

    这似乎是关于进程间通信中“生产者-消费者”问题的典型第一年作业(可能在 OS 类中?)

    您需要一种将信息从一个进程传递到另一个进程的方法。有一些方法可以做到这一点,更简单的是“公共内存”,即所有进程都可以访问的一块内存。

    在我看来,我建议实现一个队列。然后,您的所有进程都应该有权访问此队列,但仅在允许时(即来自互斥锁的结果)。您还可以通过添加检查大小的功能来扩充队列(例如 int isQueueFull(void); ,因为每个队列节点也应该有一个索引,例如 struct{ queueNode* nextNode; unsigned int index; void* data;} )。

    如果您将所有这些都放在同一个初始进程中,然后从它启动线程,那么您的所有“子进程”都可以访问与父进程相同的内存空间。

    【讨论】:

      猜你喜欢
      • 2012-06-07
      • 2018-06-05
      • 2012-12-20
      • 1970-01-01
      • 1970-01-01
      • 2012-06-26
      • 2019-03-26
      • 2011-10-18
      • 1970-01-01
      相关资源
      最近更新 更多