【问题标题】:This C++ class for storing set of shared_ptr: is it thread safe?这个用于存储 shared_ptr 集的 C++ 类:它是线程安全的吗?
【发布时间】:2021-04-25 06:43:31
【问题描述】:

假设 C++17(其中一些在 C++20 中已弃用)

我写了一个 EventDB 类,它存储了一组固定的 shared_ptr。

class EventDB
{
public:
    EventDB() = delete;
    
    EventDB(const std::vector<std::shared_ptr<const EventInfo>>& init_events):
        PointEvents(init_events.begin(),init_events.end())
    {}
    
    std::shared_ptr<const EventInfo> Swap(std::shared_ptr<const EventInfo> event)
    {
        auto old_it = PointEvents.find(event);
    
        if(old_it == PointEvents.end())
            return nullptr;
    
        //cast away the constness of the iterator element
        //is this OK, because we know we're not changing its hash/equality?
        auto old_evt_addr = &const_cast<std::shared_ptr<const EventInfo>&>(*old_it);
    
        return std::atomic_exchange(old_evt_addr,event);
    }   

private:
    std::unordered_set<std::shared_ptr<const EventInfo>,EventPointHash,EventPointEq> PointEvents;
};

它提供了一种使用 std::atomic_exchange 交换集合元素的方法。 “换出”集合中的一个元素似乎毫无意义,但我为集合提供了自定义哈希和相等性,因此即使从集合的角度来看它们是等价的,交换的元素实际上也包含不同的数据。这一切的正确性是次要问题的主题,因为如果需要,我可以用地图替换它。

我的主要问题是关于线程安全 - EventDB 线程安全吗?如果不是,为什么不呢?

上面提到的第二个问题是抛弃 set 迭代器的常量性有多糟糕,这样我就可以(原子地)修改元素。我是否违反了语言规则并依赖于实现特定的行为?或者这在技术上是否允许?

对于额外的荣誉,我将在 C++20 中用什么替换 std::atomic_exchange。我知道有适当的原子智能指针,但是在这个例子中我可以在普通 shared_ptr 之间转换吗?

这里有一些独立的代码,可以编译并使用 g++ 9.3.0 GLIBCXX_3.4.28



#include <vector>
#include <string>
#include <iostream>
#include <thread>
#include <memory>
#include <limits>
#include <unordered_set>

enum class EventType : uint8_t
{
    RED = 0,
    BLUE = 1
};

class EventInfo
{
public:
    EventInfo() = delete;
    EventInfo(const EventType t, const size_t i, const std::string& p):
        Type(t),Index(i),Payload(p)
    {}
    size_t GetIndex() const
    {
        return Index;
    }
    EventType GetEventType() const
    {
        return Type;
    }
    const std::string& GetPayload() const
    {
        return Payload;
    }
private:
    EventType Type;
    size_t Index;
    std::string Payload;
};

struct EventPointHash
{
    size_t operator()(const std::shared_ptr<const EventInfo>& evt) const
    {
        if(!evt)
            return std::numeric_limits<size_t>::max();
        return (evt->GetIndex() << (sizeof(EventType)<<3)) + static_cast<size_t>(evt->GetEventType());
    }
};

struct EventPointEq
{
    bool operator()(const std::shared_ptr<const EventInfo>& lhs,
        const std::shared_ptr<const EventInfo>& rhs) const
    {
        if(!lhs && !rhs) return true;
        if(!lhs || !rhs) return false;
        return (lhs->GetIndex() == rhs->GetIndex() && lhs->GetEventType() == rhs->GetEventType());
    }
};

class EventDB
{
public:
    EventDB() = delete;
    
    EventDB(const std::vector<std::shared_ptr<const EventInfo>>& init_events):
        PointEvents(init_events.begin(),init_events.end())
    {}
    
    std::shared_ptr<const EventInfo> Swap(std::shared_ptr<const EventInfo> event)
    {
        auto old_it = PointEvents.find(event);
    
        if(old_it == PointEvents.end())
            return nullptr;
    
        //cast away the constness of the iterator element
        //is this OK, because we know we're not changing its hash/equality?
        auto old_evt_addr = &const_cast<std::shared_ptr<const EventInfo>&>(*old_it);
    
        return std::atomic_exchange(old_evt_addr,event);
    }   

private:
    std::unordered_set<std::shared_ptr<const EventInfo>,EventPointHash,EventPointEq> PointEvents;
};

int main()
{
    //create a database to hold 100 events
    std::vector<std::shared_ptr<const EventInfo>> init_events;
    for(int i=0;i<100;i++)
    {
        init_events.emplace_back(std::make_shared<const EventInfo>(EventType::RED,i,"-1"));
    }
    EventDB DB(init_events);
    
    //Access the element concurrently
    std::vector<std::thread> threads;
    for(int i = 0;i<5;i++)
    {
        threads.emplace_back([&]()
        {
            for(int j = 0;j<1000000;j++)
            {
                //replace a random element
                auto event = std::make_shared<const EventInfo>(EventType::RED,rand()%100,std::to_string(j));
                auto old_evt = DB.Swap(event);
                //access the data - randomly print
                if(old_evt && std::stoi(old_evt->GetPayload())%2000 == 0 && old_evt->GetIndex() == 66)
                    std::cout<<"Replaced "<<old_evt->GetPayload()<<" with "<<event->GetPayload()<<std::endl;
            }
        });
    }
    init_events.clear();
    
    for(auto& t : threads)
        t.join();
        
    return 0;
}

典型输出:

Replaced 20000 with 20033
Replaced 134000 with 134002
Replaced 144000 with 143694
Replaced 144000 with 144435
Replaced 172000 with 174980
Replaced 252000 with 255578
Replaced 258000 with 252434
Replaced 368000 with 367261
Replaced 498000 with 497470
Replaced 584000 with 583205
Replaced 628000 with 619809
Replaced 722000 with 722603
Replaced 730000 with 722302
Replaced 780000 with 768508
Replaced 784000 with 784036
Replaced 816000 with 821799
Replaced 842000 with 844719
Replaced 970000 with 950851

编辑:

Igor 的answer 向我指出了数据竞赛。然后我能够轻松地修改代码以在实践中证明它。

如果使用了被破坏的元素,则添加一个破坏哈希的析构函数,然后在查找失败时打印一条消息:

    ~EventInfo()
    {
        //these aren't used in the example
        // - so they will mess up find when the race is lost
        Index = 200;
        Type = EventType::BLUE;
    }
    auto old_evt = DB.Swap(event);
    if(!old_evt)
        std::cout<<"BOOM"<<std::endl;

果然:

BOOM
BOOM
BOOM
BOOM

固定代码(除非有人发现别的东西!)##

这是我尝试实施 Igor 的 answer 中建议的修复程序

#include <vector>
#include <string>
#include <iostream>
#include <thread>
#include <memory>
#include <limits>
#include <unordered_map>

enum class EventType : uint8_t
{
    RED = 0,
    BLUE = 1
};

class EventInfo
{
public:
    EventInfo() = delete;
    EventInfo(const EventType t, const size_t i, const std::string& p):
        Type(t),Index(i),Payload(p)
    {}
    size_t GetIndex() const
    {
        return Index;
    }
    EventType GetEventType() const
    {
        return Type;
    }
    const std::string& GetPayload() const
    {
        return Payload;
    }
private:
    EventType Type;
    size_t Index;
    std::string Payload;
};

struct EventPointHash
{
    size_t operator()(const std::pair<EventType,size_t>& point) const
    {
        return (point.second << (sizeof(EventType)<<3)) + static_cast<size_t>(point.first);
    }
};

class EventDB
{
public:
    EventDB() = delete;
    
    EventDB(const std::vector<std::shared_ptr<const EventInfo>>& init_events)
    {
        for(const auto& event : init_events)
            PointEvents[{event->GetEventType(),event->GetIndex()}] = event;
    }
    
    std::shared_ptr<const EventInfo> Swap(const std::shared_ptr<const EventInfo> event)
    {
        auto old_it = PointEvents.find({event->GetEventType(),event->GetIndex()});

        if(old_it == PointEvents.end())
            return nullptr;

        auto old_evt_addr = &(old_it->second);

        return std::atomic_exchange(old_evt_addr,event);
    }   

private:
    std::unordered_map<std::pair<EventType,size_t>,std::shared_ptr<const EventInfo>,EventPointHash> PointEvents;
};

int main()
{
    //create a database to hold 100 events
    std::vector<std::shared_ptr<const EventInfo>> init_events;
    for(int i=0;i<100;i++)
    {
        init_events.emplace_back(std::make_shared<const EventInfo>(EventType::RED,i,"-1"));
    }
    EventDB DB(init_events);
    init_events.clear();
    
    //Access the element concurrently
    std::vector<std::thread> threads;
    for(int i = 0;i<5;i++)
    {
        threads.emplace_back([&]()
        {
            for(int j = 0;j<1000000;j++)
            {
                //replace a random element
                auto event = std::make_shared<const EventInfo>(EventType::RED,rand()%100,std::to_string(j));
                auto old_evt = DB.Swap(event);
                
                if(!old_evt)
                {
                    std::cout<<"BOOM"<<std::endl;
                    continue;
                }
                
                //access the data - randomly print
                if(std::stoi(old_evt->GetPayload())%2000 == 0 && old_evt->GetIndex() == 66)
                    std::cout<<"Replaced "<<old_evt->GetPayload()<<" with "<<event->GetPayload()<<std::endl;
            }
        });
    }
    
    for(auto& t : threads)
        t.join();
        
    return 0;
}

【问题讨论】:

    标签: c++ c++17 c++20


    【解决方案1】:

    有一场数据竞赛。虽然通过std::atomic_exchange 写入集合的元素是原子的,但在find 中读取它们却不是。 find 如果另一个线程从它下面交换一个元素,可能会看到一个撕裂的读取。

    还有一个更微妙的场景:一个线程调用了PointEvents.find(event),而find 当前正在读取集合中一些EventInfo 实例的内容(我们称之为X),以计算其哈希值或比较它到event。同时,另一个线程对同一元素执行Swap,并将持有X 的共享指针返回给调用者。调用者可能会简短地查看X,然后允许共享指针被销毁,X 与它一起被销毁。 find 然后与 X 的析构函数竞争。


    考虑将有助于散列和相等性的EventInfo 的固定部分与可能变化的有效负载部分分开。将它们存储在std::unordered_map 中,固定部分作为键,有效负载作为值。然后你可以在不影响find的情况下交换payload。

    【讨论】:

    • 优秀。查看我的编辑 - 我添加了一些代码来演示比赛。我知道我在通过滥用 std::unordered_set 玩火,而您建议的修复绝对是一个明智的方法。但只是为了一个实验,我想知道是否可以通过在散列和相等函数中使用按值传递来修复初始数据竞争。 NO是答案!它在很大程度上失败了(段错误和纯虚拟方法调用)。对此有什么想法吗?我认为复制 shared_ptrs 将是线程安全的。我想使用这些签名是完全错误的,但没有编译器错误......?
    • 复制共享指针无济于事 - 无论是通过原始指针还是通过其副本,您仍在读取被销毁的同一 EventInfo 实例。
    • 另外,shared_ptr 不是线程安全的。可以同时访问或修改shared_ptr 的两个单独实例,即使它们碰巧指向同一个对象。但是对同一个shared_ptr 实例执行不同步的非原子访问是不行的,比如说不超过std::string。通过制作更多集合元素的副本,我怀疑您更可能使我在回答中提到的第一次数据竞赛。
    • 复制 shared_ptr 是为了避免 EventInfo 被删除,但就像您在下一条评论中所说的那样,真正的问题是复制是从同一个实例上的多个线程发生的(在放)。我将发布我的代码以及您建议的更改以真正修复它。谢谢
    猜你喜欢
    • 2011-08-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-19
    相关资源
    最近更新 更多