【问题标题】:How to implement semaphore? Is this implementation correct or faulty?如何实现信号量?这个实现是正确的还是错误的?
【发布时间】:2019-10-29 02:44:29
【问题描述】:

我想实现信号量类。 stackoverflow 上的用户注意到我的实现不正确。

一开始我是这样做的:

class sem_t {
    int count;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        this->count++;
    }
    void down() {
        while (this->count == 0)
            std::this_thread::yield();
        this->count--;
    }
};

然后stackoverflow上的一个用户注意到这个实现是错误的,因为我从任何同步原语中读取和写入变量count,并且在某些时候该值可能会变得不正确,并且在编译器优化的情况下编译器可以假设变量@ 987654327@ 不能被其他线程修改。所以,我尝试将互斥锁添加到这个结构中,我已经这样做了:

class sem_t {
    int count;
    std::mutex mutualExclusion;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        this->mutualExclusion.lock();
        this->count++;
        this->mutualExclusion.unlock();
    }
    void down() {
                this->mutualExclusion.lock();
        while (this->count == 0)
            std::this_thread::yield();
        this->count--;
        this->mutualExclusion.unlock();
    }
};

但是如果在我尝试分离线程时使用这种方法,我会收到一个错误,说互斥锁在忙碌时已被破坏,导致一个线程可以持有互斥锁,然后屈服,之后线程被分离并发生错误(这是解决方案好吗?)。 然后我尝试修改这段代码,并停止了以下构造:

class sem_t {
    int count;
    std::mutex mutualExclusion;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        this->mutualExclusion.lock();
        this->count++;
        this->mutualExclusion.unlock();
    }
    void down() {
        while (this->count == 0)
            std::this_thread::yield();
        this->mutualExclusion.lock();
        this->count--;
        this->mutualExclusion.unlock();
    }
};

但我认为这个解决方案也有问题,因为它可能会导致与第一个解决方案相同的问题。

那么,正确的实现是什么? 我想说明我尝试使用 条件变量 来实现,但我正在尝试在没有 条件变量 的情况下实现信号量,如果您想建议使用 的解决方案条件变量请描述条件变量的等待方法是如何工作的。

[编辑]

我的完整代码,使用自行实现的信号量:

#include "pch.h"
#include <iostream>
#include <vector>
#include <mutex>
#include <thread>
#include <chrono>

class sem_t {
    int count;
    std::mutex mutualExc;
public:
    sem_t(int _count = 0) : count(_count) {};
    void up() {
        mutualExc.lock();
        this->count++;
        mutualExc.unlock();
    }
    void down() {
        mutualExc.lock();
        while (this->count == 0) {
            mutualExc.unlock();
            std::this_thread::yield();
            mutualExc.lock();
        }
        this->count--;
        mutualExc.unlock();
    }
};

#define N 5
#define THINKING 0
#define HUNGRY 1
#define EATING 2

std::mutex mx;
std::mutex coutMX;
char philosopherState[N] = { THINKING };
sem_t philosopherSemaphores[N] = { 0 };

void testSetState(short i) {
    if (philosopherState[i] == HUNGRY && philosopherState[(i + 1) % N] != EATING && philosopherState[(i + N - 1) % N] != EATING) {
        philosopherState[i] = EATING;
        philosopherSemaphores[i].up();
    }
}

void take_forks(short i) {
    ::mx.lock();
    philosopherState[i] = HUNGRY;
    
    testSetState(i);
    ::mx.unlock();
    philosopherSemaphores[i].down();
}
void put_forks(short i) {
    ::mx.lock();
    philosopherState[i] = THINKING;

    testSetState((i + 1) % N);
    testSetState((i + N - 1) % N);
    ::mx.unlock();
}

void think(short p) {
    for (short i = 0; i < 5; i++) {
        coutMX.lock();
        std::cout << "Philosopher N" << p << " is thinking!" << std::endl;
        coutMX.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}
void eat(short p) {
    for (short i = 0; i < 5; i++) {
        coutMX.lock();
        std::cout << "Philosopher N" << p << " is eating!" << std::endl;
        coutMX.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

void philosopher(short i) {
    while (1) {
        think(i);
        take_forks(i);
        eat(i);
        put_forks(i);
    }
}

int main()
{
    std::vector<std::thread*> threadsVector;
    for (int i = 0; i < N; i++) {
        threadsVector.push_back(new std::thread([i]() { philosopher(i); }));
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(15000));

    for (int i = 0; i < N; i++) {
        threadsVector[i]->detach();
    }

    return 0;
}

正在发生的错误(仅在 Visual Studio 中以发布或调试模式运行程序时发生)

【问题讨论】:

  • 在现实生活中的编程中,你想使用condition_variable,原子递增和递减,没有全局变量,停止线程的适当机制等。如果你对C++中的多线程很认真,你必须阅读 C++ Concurrency In Action 一书。
  • @Phil1970 好的,我会尽快阅读那本书,谢谢您的建议。
  • 由于餐饮哲学家问题主要是学术性的,通常会假设它永远存在。

标签: c++ synchronization semaphore


【解决方案1】:

最后一次尝试确实是不正确的,因为可能发生了多个线程同时调用down并且都成功通过

    while (this->count == 0)
        std::this_thread::yield();

行,然后它们都会将计数器递减到可能为负值:

    this->mutualExclusion.lock();
    this->count--;
    this->mutualExclusion.unlock();

因此,计数器值的检查和更新必须原子地执行。

如果你想保持忙碌循环,最简单的方法就是在 yield 之前调用unlock,之后调用lock,这样比较和递减将在同一个锁下执行:

void down() {
    this->mutualExclusion.lock();
    while (this->count == 0) {
        this->mutualExclusion.unlock();
        std::this_thread::yield();
        this->mutualExclusion.lock();
    }
    this->count--;
    this->mutualExclusion.unlock();
}

另外,您可以使用std::unique_lock 守卫,它在构造函数中锁定提供的互斥体并在析构函数中解锁,因此互斥体不会意外处于锁定状态:

void down() {
    std::unique_lock<std::mutex> lock(this->mutualExclusion);
    while (this->count == 0) {
        lock.unlock();
        std::this_thread::yield();
        lock.lock();
    }
    this->count--;
}

要处理“忙时多路复用破坏”错误,您需要有一个标志来停止后台线程并等待它们完成 join 而不是分离:

#include <atomic>

...

std::atomic<bool> stopped{ false };

void philosopher(short i) {
    while (!stopped) {
        ...
    }
}

...

int main()
{
    ...

    stopped = true;
    for (int i = 0; i < N; i++) {
        threadsVector[i]->join();
    }
    return 0;
}

或者如果你真的不想释放静态资源,你可以调用std::quick_exit而不是detachreturn

int main()
{
    ...
    std::this_thread::sleep_for(std::chrono::milliseconds(15000));
    std::quick_exit(0);
}

【讨论】:

  • 我尝试了那个解决方案,但是在某些情况下分离线程时,它也会导致“忙时破坏互斥锁”错误,这就是我感到困惑的地方。
  • 条件变量的工作原理是一样的吗?我不明白为什么条件变量解决方案不会导致相同的错误。
  • 似乎没有通知机制的解决方案遇到了this post中提到的问题。
  • @h4ckthepl4net 你确定你只是分离线程而不是破坏信号量实例,而它在其他线程中使用的阻塞状态?您可以添加最小的示例来重现“分离”时的问题吗?
  • @h4ckthepl4net 因此,您有全局互斥锁和信号量,并在其他线程使用它们时从 main 返回 - 这是不正确的。我在答案中添加了一些终止想法示例。希望这会有所帮助。
猜你喜欢
  • 2011-05-30
  • 2016-02-10
  • 2016-04-06
  • 1970-01-01
  • 2015-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多