【问题标题】:Is this a safe way to implement an asynchronous message queue without mutexes?这是实现没有互斥锁的异步消息队列的安全方法吗?
【发布时间】:2015-06-14 20:20:26
【问题描述】:

以下是我如何使用 while 循环通过简单检查实现异步消息队列。我认为这是比使用互斥锁更好的方法,如果它可以正常工作的话。通过在我的机器上运行一些测试似乎可以正常工作,但我真的不确定这是否安全,因为我在为多个线程/进程编写异步系统方面没有太多经验。我的工作是否安全地防止了比赛条件?或者它会在一些较重的负载或任何其他情况下崩溃?

typedef struct MessageQueueElement {
    Message message;
    struct MessageQueueElement *next;
} MessageQueueElement;

typedef struct MessageQueue { //singly-linked list as a queue
    MessageQueueElement *first;
    MessageQueueElement *last;
    bool sending;
} MessageQueue;

void createMessageQueue(MessageQueue *this) {
    this->first = malloc(sizeof(MessageQueueElement));
    this->last = this->first;
    this->sending = false;
}

void sendMessage(MessageQueue *this, Message *message) {
    while (this->sending);
    //do nothing while this function is called from another thread

    this->sending = true;
    this->last->message = *message;
    this->last = this->last->next = malloc(sizeof(MessageQueueElement));
    //add a message to the queue

    this->sending = false;
}

int waitMessage(MessageQueue *this, int (*readMessage)(unsigned, unsigned, void *)) {
    while (this->first == this->last);
    //do nothing while the queue is empty

    int n = readMessage(this->first->message.type, this->first->message.code, this->first->message.data);
    MessageQueueElement *temp = this->first;
    this->first = this->first->next;
    free(temp);
    return n;
}

请参阅下面的整个上下文和一些测试代码。

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>

#define EXIT_MESSAGE 0
#define THREAD_MESSAGE 1
#define EXIT 0
#define CONTINUE 1

int readMessage(size_t type, size_t code, void *data) {
    if (type == THREAD_MESSAGE) {
        printf("message from thread %d: %s\n", code, (char *)data);
        free(data);
    } else {
        return EXIT;
    }
    return CONTINUE;
}

MessageQueue mq;
int nThreads;
int counter = 0;

void *worker(void *p) {
    double pi = 0.0;
    for (int i = 0; i < 10; i += 1) {
        for (int j = 0; j < 100000; j += 1) {
            double n = i * 100000.0 + j;
            pi += (4.0 / (8.0 * n + 1.0) - 2.0 / (8.0 * n + 4.0) - 1.0 / (8.0 * n + 5.0) - 1.0 / (8.0 * n + 6.0)) / pow(16.0, n);
        }
        char *s = malloc(100);
        sprintf(s, "calculating pi... %d percent complete", (i + 1) * 10);
        sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)p, .data = s});
    }
    char *s = malloc(100);
    sprintf(s, "pi equals %.8f", pi);
    sendMessage(&mq, &(Message){.type = THREAD_MESSAGE, .code = (int)p, .data = s});
    counter += 1;
    if (counter == nThreads) {
        sendMessage(&mq, &(Message){.type = EXIT_MESSAGE});
    }
}


int main(int argc, char **argv) {
    nThreads = atoi(argv[1]);
    createMessageQueue(&mq);

    pthread_t threads[nThreads];
    for (int i = 0; i < nThreads; i += 1) {
        pthread_create(&threads[i], NULL, worker, (void *)i);
    }
    while (waitMessage(&mq, readMessage));
    for (int i = 0; i < nThreads; i += 1) {
        pthread_join(threads[i], NULL);
    }
    return 0;
}

【问题讨论】:

  • 如果您对多线程编程完全不熟悉,请不要编写自己的消息队列。如果这是针对“现实世界”问题,请使用套接字或操作系统提供的消息队列机制。通过在自己的时间解决“玩具”问题来熟悉线程同步原语,然后再回来。

标签: c multithreading asynchronous thread-safety message-queue


【解决方案1】:

显然不行。对于初学者来说,如果两个线程都在等待队列变空,它们很可能会同时发现正在发送 == false,并且都会跳进去,然后事情就会以一种糟糕的方式出错。这正是互斥锁的用途。所以这行不通。

忙于等待变量也是非常糟糕的形式。如果您有一个四核 CPU,那么四核很有可能会花费 100% 的可用 CPU 来等待变量更改。不好。

而且由于发送不是易失性的,您的编译器将看不到任何生成代码以将其设置为 true 的理由,所以这又是行不通的。

要使其正常工作,您需要完成互斥锁所做的所有事情。而且您必须完全正确地做所有事情。这在很大程度上取决于您使用的确切处理器。你需要注意缓存一致性、读写操作的顺序、内存屏障等等,如果你甚至没有听说过这些,那么你就没有机会做对了。

【讨论】:

    猜你喜欢
    • 2010-12-10
    • 1970-01-01
    • 2012-06-05
    • 2023-02-07
    • 2013-01-03
    • 1970-01-01
    • 2012-07-06
    • 1970-01-01
    • 2019-08-11
    相关资源
    最近更新 更多