【问题标题】:Creating a lock that preserves the order of locking attempts in C++11在 C++11 中创建一个保留锁定尝试顺序的锁
【发布时间】:2013-01-25 08:48:09
【问题描述】:

有没有办法确保被阻塞的线程按照被阻塞的顺序唤醒?我在某处读到这将被称为“强锁”,但我没有找到任何资源。

在 Mac OS X 上,可以设计一个 FIFO 队列来存储阻塞线程的所有线程 ID,然后使用漂亮的函数 pthread_cond_signal_thread_np() 来唤醒一个特定的线程——这显然是非标准且不可移植的。

我能想到的一种方法是使用类似的队列,并在 unlock() 点向所有线程发送 broadcast() 并让它们检查哪一个是下一个队列。
但这会导致大量开销。

解决该问题的一种方法是将 packaged_task 发送到队列并让它按顺序处理它们。但这对我来说更像是一种解决方法而不是解决方案。

编辑:
正如 cmets 所指出的,这个问题听起来可能无关紧要,因为原则上没有保证锁定尝试的顺序。
作为澄清:

我有一个我称之为 ConditionLockQueue 的东西,它与 Cocoa 库中的 NSConditionLock 类非常相似,但它维护一个阻塞线程的 FIFO 队列,而不是一个或多或少的随机池。

基本上任何线程都可以“排队”(有或没有特定“条件”的要求 - 一个简单的整数值 - 要满足)。然后线程被放置在队列中并阻塞,直到它是队列中满足条件的最前面的元素。

这提供了一种非常灵活的同步方式,我发现它在我的程序中非常有用。
现在我真正需要的是一种唤醒具有特定 ID 的特定线程的方法。
但这些问题几乎都是一样的。

【问题讨论】:

  • 请解释为什么需要保持顺序...以及如何保证线程将按照您希望它们到达的顺序到达锁。
  • 嗯,首先它很有趣!然后它正好适合我的程序;)我可以将它设计为更加异步,但如果有另一种方式会更简单。
  • 我的观点是,即使你得到了这个解决方案,它也是没有意义的,因为你需要保证线程以某种顺序到达锁,而如果没有先前的同步,你就无法做到这一点。我可能是错的,所以我要求解释你的设计。
  • 好吧,如果你真的想这样做,我认为最好的办法是在锁定机制中内置一个“谁是下一个”队列。但我仍然认为寻找新问题似乎是一个问题,所以现在你有两个问题...... ;)
  • 所以在某些时候线程必须再次同步并“单线程”执行?为什么不在你现在要锁定的地方,将std::functions(或类似的)放在一个FIFO队列上,然后只处理那个单线程,同时在线程中等待,直到那个函数完成?它会产生相同的效果,您只需要锁定 FIFO 队列,并且可能需要一些条件/唤醒/未来机制来继续线程。

标签: c++ multithreading c++11 locking


【解决方案1】:

构建一个使用编号票证的锁对象非常容易,以确保其完全公平(按照线程首先尝试获取锁的顺序授予锁):

#include <mutex>
#include <condition_variable>

class ordered_lock {
    std::condition_variable  cvar;
    std::mutex               cvar_lock;
    unsigned int             next_ticket, counter;
public:
    ordered_lock() : next_ticket(0), counter(0) {}
    void lock() {
        std::unique_lock<std::mutex> acquire(cvar_lock);
        unsigned int ticket = next_ticket++;
        while (ticket != counter)
            cvar.wait(acquire);
    }
    void unlock() {
        std::unique_lock<std::mutex> acquire(cvar_lock);
        counter++;
        cvar.notify_all();
    }
};

编辑

修正奥拉夫的建议:

#include <mutex>
#include <condition_variable>
#include <queue>

class ordered_lock {
    std::queue<std::condition_variable *> cvar;
    std::mutex                            cvar_lock;
    bool                                  locked;
public:
    ordered_lock() : locked(false) {};
    void lock() {
        std::unique_lock<std::mutex> acquire(cvar_lock);
        if (locked) {
            std::condition_variable signal;
            cvar.emplace(&signal);
            signal.wait(acquire);
        } else {
            locked = true;
        }
    }
    void unlock() {
        std::unique_lock<std::mutex> acquire(cvar_lock);
        if (cvar.empty()) {
            locked = false;
        } else {
            cvar.front()->notify_one();
            cvar.pop();
        }
    }
};

【讨论】:

  • @lolo 然后使用每个线程一个条件变量的队列。
  • @Quirliom:对于较新版本的 C++,您需要 emplace_back 而不是 push_back 来避免移动/复制,但旧版本没有 emplace_back
  • 第二个版本在虚假唤醒的情况下不起作用(等待有时可能会继续)。另外,我想知道在另一个等待线程继续之前调用 cvar.pop() 是否有问题。然后条件变量被销毁,而其他线程仍在等待该变量触发...
  • 为了避免虚假唤醒和破坏条件变量: 1. 将cvar.pop() 移动到lock 函数。 2. 不再可能使用计数器变量,但仍然可以存储指向最近唤醒的条件变量的指针,并与“锁定”函数进行比较。
  • @OrHirshfeld:cvar_lock 由wait 调用释放,因此它可以扩展到任意数量的线程,并且它们将按FIFO 顺序获取有序锁。是的,获得有序锁的线程需要(简单地)在收到信号时重新获取 cvar_lock,但这不应该是瓶颈。
【解决方案2】:

我尝试了 Chris Dodd 解决方案 https://stackoverflow.com/a/14792685/4834897

但是编译器返回了错误,因为队列只允许有能力的标准容器。 正如您在 Akira Takahashi 的以下回答中所见,引用 (&) 不可复制: https://stackoverflow.com/a/10475855/4834897

所以我使用允许可复制引用的 reference_wrapper 更正了解决方案。

编辑: @Parvez Shaikh 建议通过在 lock() 函数中的 signal.wait() 之后移动 cvar.pop() 来进行小的改动,以使代码更具可读性

#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <vector>

#include <functional> // std::reference_wrapper, std::ref

using namespace std;

class ordered_lock {
    queue<reference_wrapper<condition_variable>> cvar;
    mutex                                        cvar_lock;
    bool                                         locked;
public:
    ordered_lock() : locked(false) {}
    void lock() {
        unique_lock<mutex> acquire(cvar_lock);
        if (locked) {
            condition_variable signal;
            cvar.emplace(std::ref(signal));
            signal.wait(acquire);
            cvar.pop();
        } else {
            locked = true;
        }
    }
    void unlock() {
        unique_lock<mutex> acquire(cvar_lock);
        if (cvar.empty()) {
            locked = false;
        } else {
            cvar.front().get().notify_one();
        }
    }
};

另一种选择是使用指针代替引用,但似乎不太安全。

【讨论】:

    【解决方案3】:

    我们在这个线程上提出了正确的问题吗???如果是这样:他们的回答正确吗???

    或者换一种说法:

    我完全误解了这里的东西吗??

    编辑段落:似乎 StatementOnOrder(见下文)是错误的。请参阅link1(Linux 下的 C++ 线程等通常基于 pthreads)和link2(提到当前调度策略是决定因素)——感谢来自 cppreference 的 Cubbi (ref)。另见linklinklinklink。如果该语句为假,那么拉原子(!)票的方法,如下面的代码所示,可能是首选!!

    来了……

    StatementOnOrder:“多个线程遇到锁定的互斥体并因此按特定顺序“进入睡眠状态”,随后将获得互斥体的所有权并以相同的顺序继续运行。”

    问题:StatementOnOrder是真是假???

    void myfunction() {
        std::lock_guard<std::mutex> lock(mut);
    
        // do something
        // ...
        // mutex automatically unlocked when leaving funtion.
    }
    

    我之所以这么问,是因为迄今为止此页面上的所有代码示例似乎都是:

    a) 浪费(如果 StatementOnOrder 为真)

    b) 严重错误(如果 StatementOnOrder 为假)。

    如果 StatementOnOrder 为假,为什么还要说他们可能“严重错误”?
    原因是所有代码示例都认为他们通过使用std::condition_variable 来超级智能,但实际上在 before 之前使用了锁,这将(如果 StatementOnOrder 为 false ) 搞乱顺序!!!
    只需在此页面搜索std::unique_lock&lt;std::mutex&gt;,即可看到讽刺意味。

    所以如果 StatementOnOrder 真的是假的,你就不能遇到锁,然后处理票证和条件变量的东西之后。相反,您必须执行以下操作:遇到任何锁之前拉一张原子票!!!
    为什么要在遇到锁之前拉票?因为这里我们假设 StatementOnOrder 为假,所以任何排序都必须在“邪恶”锁之前完成。

    #include <mutex>
    #include <thread>
    #include <limits>
    #include <atomic>
    #include <cassert>
    #include <condition_variable>
    #include <map>
    
    std::mutex mut;
    std::atomic<unsigned> num_atomic{std::numeric_limits<decltype(num_atomic.load())>::max()};
    unsigned num_next{0};
    std::map<unsigned, std::condition_variable> mapp;
    
    void function() {
        unsigned next = ++num_atomic; // pull an atomic ticket
    
        decltype(mapp)::iterator it;
    
        std::unique_lock<std::mutex> lock(mut);
        if (next != num_next) {
            auto it = mapp.emplace(std::piecewise_construct,
                                   std::forward_as_tuple(next),
                                   std::forward_as_tuple()).first;
            it->second.wait(lock);
            mapp.erase(it);
        }
    
    
    
        // THE FUNCTION'S INTENDED WORK IS NOW DONE
        // ...
        // ...
        // THE FUNCTION'S INDENDED WORK IS NOW FINISHED
    
        ++num_next;
    
        it = mapp.find(num_next); // this is not necessarily mapp.begin(), since wrap_around occurs on the unsigned                                                                          
        if (it != mapp.end()) {
            lock.unlock();
            it->second.notify_one();
        }
    }
    

    以上函数保证订单按照拉取的原子票执行。 (编辑:使用 boost 的侵入式映射,将 condition_variable 保存在堆栈上(作为局部变量),将是一个很好的优化,可以在这里使用,以减少免费存储的使用!) p>

    但主要问题是: StatementOnOrder 是真是假???
    (如果是真的,那么我上面的代码示例也是浪费,我们可以使用互斥体并完成它。)
    我希望像Anthony Williams 这样的人会查看此页面... ;)

    【讨论】:

    • 为什么你的回答中有这么多问题?现在我无法真正确定它是否真的一个答案。请正确格式化您的答案并删除空虚线和大量问题。 或者只是问一个问题。
    • 我的“文本”中只有一个基于谓词(即真/假)的问题。取决于该谓词问题的答案是真还是假;迄今为止,此页面上的其他答案非常值得怀疑。如果文本被视为垃圾邮件,我很抱歉(这当然不是本意),但这是一个复杂的主题;而且我不愿意妥协,但打算正确回答这个问题。
    • 鉴于操作系统可以在任何时候无限期地挂起你的线程,当多个线程竞争互斥锁时,你真的不需要担心哪个线程首先获得互斥锁。操作系统会选择“最佳”的那个,大多数操作系统也会尝试确保线程不会饿太久。
    • 感谢您的参与。因此,如果我正确阅读了您的评论...故事的结尾将是:如果您需要确保订单,请自己确保订单。线程调用任何东西时都不能保证顺序。 (确保休眠线程以与运行到互斥锁相同的顺序继续,这根本不是处理事情的好方法)。
    • 确保休眠线程以它们遇到互斥体的相同顺序继续(这通常是任意的!!![从用户的角度来看]),这根本不是处理事情的好方法. INSTEAD 在调用代码中使用您自己的锁,或“序列化”需要顺序的调用,方法是在单个线程中按顺序执行这些调用。
    猜你喜欢
    • 1970-01-01
    • 2016-01-04
    • 2013-07-08
    • 2023-03-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多