【发布时间】:2015-04-01 01:25:12
【问题描述】:
这里是 C++ 新手。我正在尝试在 unordered_map 中同时写入不同的存储桶。从我通过搜索可以看出,我的理解是这应该是一个线程安全的操作。我(可能不正确)的理解是基于答案here 和here,以及C++11 标准的引用部分(特别是第2 项——强调我的):
23.2.2 容器数据竞赛 [container.requirements.dataraces]
1 为避免数据竞争 (17.6.5.9),实现应将以下函数视为 const:begin、end、rbegin、rend、front、back、data、find、lower_bound、upper_bound、equal_range、at 和, 除了在关联或无序关联容器中,operator[].
2 尽管有 (17.6.5.9),当同时修改同一序列中不同元素中包含的对象的内容(
vector<bool>除外)时,需要实现以避免数据争用。3 [ 注意:对于大小大于一的向量 x,x[1] = 5 和 *x.begin() = 10 可以同时执行而不会发生数据争用,但 x[0] = 5 和*x.begin() = 10 同时执行可能会导致数据竞争。作为一般规则的一个例外,对于向量 y,y[0] = true 可能与 y[1] = true 竞争。 ——尾注]
无论如何,使用标准容器写入不同的存储桶似乎不是线程安全的,如下面的代码所示。您会看到我在写入之前启用了与正在修改的存储桶相对应的锁,但有时对没有正确记录。对于它的价值,如果我使用单个锁 - 例如,只需将 auto bkt = mm->bucket(key); 更改为 auto bkt=0;,有效地锁定整个 unordered_map 容器 - 一切都按预期工作。
#include <iostream>
#include <unordered_map>
#include <atomic>
#include <vector>
#include <thread>
#define NUM_LOCKS 409
#define N 100
#define NUM_THREADS 2
using namespace std;
class SpinLock
{
public:
void lock()
{
while(lck.test_and_set(memory_order_acquire)){}
}
void unlock()
{
lck.clear(memory_order_release);
}
private:
atomic_flag lck = ATOMIC_FLAG_INIT;
};
vector<SpinLock> spinLocks(NUM_LOCKS);
void add_to_map(unordered_map<int,int> * mm, const int keyStart, const int keyEnd, const int tid){
for(int key=keyStart;key<keyEnd;++key){
auto bkt = mm->bucket(key);
//lock bucket
spinLocks[bkt].lock();
//insert pair
mm->insert({key,tid});
//unlock bucket
spinLocks[bkt].unlock();
}
}
int main() {
int Nbefore, Nafter;
thread *t = new thread[NUM_THREADS];
//create an unordered map, and reserve enough space to avoid a rehash
unordered_map<int,int> my_map;
my_map.reserve(2*NUM_THREADS*N);
//count number of buckets to make sure that a rehash didn't occur
Nbefore=my_map.bucket_count();
// Launch NUM_THREADS threads. Thread k adds keys k*N through (k+1)*N-1 to the hash table, all with associated value = k.
for(int threadID=0;threadID<NUM_THREADS;++threadID){
t[threadID]=thread(add_to_map,&my_map,threadID*N,(threadID+1)*N,threadID);
}
// Wait for the threads to finish
for(int threadID=0;threadID<NUM_THREADS;++threadID){
t[threadID].join();
}
//count number of buckets to make sure that a rehash didn't occur
Nafter=my_map.bucket_count();
cout << "Number of buckets before adding elements: " << Nbefore <<endl;
cout << "Number of buckets after adding elements: " << Nafter << " <--- same as above, so rehash didn't occur" <<endl;
//see if any keys are missing
for(int key=0;key<NUM_THREADS*N;++key){
if(!my_map.count(key)){
cout << "key " << key << " not found!" << endl;
}
}
return 0;
}
当错误地没有输入密钥时,程序将退出。示例输出为:
Number of buckets before adding elements: 401
Number of buckets after adding elements: 401 <--- same as above, so rehash didn't occur
key 0 not found!
key 91 not found!
key 96 not found!
key 97 not found!
key 101 not found!
key 192 not found!
key 193 not found!
key 195 not found!
所以,我的问题有两个:
- 我在锁定存储桶的方式上是否做错了什么?
- 如果是这样,是否有更好的方法来逐个存储桶锁定映射以实现对不同存储桶的并发写入?
最后,我要提一下,我已经尝试过 TBB 的 concurrent_unordered_map,但它在我的应用程序中比简单地串行执行要慢得多。撇开杂散错误不谈,我使用 std::unordered_map 的存储桶锁定方法表现得更好。
【问题讨论】:
-
这是提问的正确方式!研究、代码、错误以及您如何尝试修复它!大多数新用户应该阅读这篇文章。 +1
-
您的代码当然有 UB,正如您可能已经发现的那样,因为您对
insert的调用会导致数据争用。实际实现中的一个简单故障模式是例如在 libc++ 中,insert会增加大小计数,因此您在大小变量上存在竞争。但只要看看你自己的库实现,并注意它不是线程安全的方式。 -
我同意@KerrekSB 的评论,即
insert在这种情况下会导致UB(激发原始问题)。但是,我不同意 Ron 的过度简化,即写作不是线程安全的。 Ron 忽略了引用标准的 2. 和 3.,该标准要求当容器的不同元素同时修改时避免竞争。因此,只要线程同步以不同时在同一位置写入,写入是安全的。因此,似乎逐个存储桶的锁定就足够了,但显然不是。 @KerrekSB,我如何看待 libc++ 实现? -
添加元素与修改元素不同。桶不是无序集合/映射的元素——元素是键/值对。您正在添加元素,但我看不出它是安全的吗?
-
@Tom:我认为您误解了标准中的声明。正如 Yakk 所解释的,意思是您可以同时修改不同的容器 元素。您永远不允许同时修改容器本身,“修改”意味着调用任何非常量成员函数(请参阅 Ron 的评论)。您可以整天对实施做出假设,但这并不能为您带来任何标准保证。
标签: c++ multithreading c++11 concurrency