【问题标题】:Lock a mutex from mutex array with atomic index使用原子索引从互斥体数组中锁定互斥体
【发布时间】:2016-12-20 09:46:21
【问题描述】:

我正在尝试编写一个缓冲区,它可以将数据推送到缓冲区,检查是否已满,必要时交换缓冲区。另一个线程可以获取文件输出的缓冲区。

我已经成功实现了缓冲区,但我想添加一个 ForceSwapBuffer 方法,该方法将强制交换不完整的缓冲区并从不完整的缓冲区返回数据。为了做到这一点,我检查读取和写入缓冲区是否相同(尝试强制交换缓冲区写入文件是没有用的,而仍有其他完整的缓冲区可以写入)。 我希望这个方法能够与 GetBuffer 方法一起运行(不是真的必要,但我想尝试一下,偶然发现了这个问题)。

GetBuffer 会阻塞,当 ForceSwapBuffer 完成时它仍然会阻塞,直到新缓冲区完全填满,因为在 ForceSwapBuffer 中我更改了原子 _read_buffer_index。我想知道这是否会一直有效? GetBuffer 的阻塞锁是否会检测到原子 read_buffer_index 的更改并更改它试图锁定的互斥锁,还是会在锁开始时检查它必须锁定的互斥锁并继续尝试锁定同一个互斥锁,即使索引变化?

/* selection of member data */
unsigned int _size, _count;

std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;

std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr< std::mutex[] > _mutexes;

std::recursive_mutex _force_swap_buffer;

/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
    if (_index + length <= _size) {
        memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
        _index += length;
    } else {
        memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
        unsigned int t_index = _index;
        SwapBuffer();
        Push(&data[_size - t_index], length - (_size - t_index));
    }
}

template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
    std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
    std::unique_ptr<T[]> result(new T[_size]);
    memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
    _read_buffer_index = (_read_buffer_index + 1) % _count;
    return std::move(result);
}

template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time

    if (_write_buffer_index != _read_buffer_index)
        return nullptr;

    std::unique_ptr<T[]> result(new T[_index]);
    memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));

    unsigned int next = (_write_buffer_index + 1) % _count;

    _mutexes[next].lock();
    _read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
    _mutexes[_write_buffer_index].unlock();

    _write_buffer_index = next;
    _index = 0;

    return result;
}

【问题讨论】:

    标签: c++ arrays multithreading mutex atomic


    【解决方案1】:

    您的代码存在一些问题。首先,修改原子变量时要小心。只有一小部分操作是真正原子的(参见http://en.cppreference.com/w/cpp/atomic/atomic),并且原子操作的组合不是原子的。考虑:

    _read_buffer_index = (_read_buffer_index + 1) % _count;
    

    这里发生的是对变量的原子读取、增量、模运算和原子存储。但是,整个语句本身不是原子的!如果 _count 是 2 的幂,则可以使用 ++-运算符。如果不是,则必须将_read_buffer_index 读入一个临时变量,执行上述计算,然后使用compare_exchange 函数存储新值如果该变量同时没有更改时间>。显然,后者必须循环完成,直到成功。您还必须担心一个线程在第二个线程的 read 和 compare_exchange 之间增加变量 _count 次的可能性,在这种情况下,第二个线程错误地认为变量没有更改。

    第二个问题是缓存行弹跳。如果您在同一缓存行上有多个互斥锁,那么如果两个或多个线程尝试同时访问它们,性能将非常糟糕。缓存行的大小取决于您的平台。

    主要问题是虽然ForceSwapBuffer()Push() 都锁定了_force_swap_buffer 互斥锁,但GetBuffer() 没有。 GetBuffer() 然而确实改变了_read_buffer_index。所以在ForceSwapBuffer():

    std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
    
    if (_write_buffer_index != _read_buffer_index)
        return nullptr;
    
    // another thread can call GetBuffer() here and change _read_buffer_index
    
    // rest of the code here
    

    if-statement 后面的_write_buffer_index == _read_buffer_index 的假设实际上是无效的。

    【讨论】:

    • 我一定会查看您对 atomic 变量的评论,但在我的情况下只有 1 个线程会写入它,atomic 是为了确保另一个线程中的读取行为是正确的。跨度>
    • 我还忘了提到的是,当 _write_buffer_index == _read_buffer_index 时,GetBuffer 将始终锁定,因为如果另一个线程仍在写入缓冲区,它无法获取要写入的缓冲区,这就是 ForceSwapBuffer (它返回未完成的缓冲区)所以我不认为_write_buffer_index == _read_buffer_index 假设是无效的。
    • 因为我不能编辑评论:由Push 调用的普通SwapBuffer() 确保新缓冲区始终被锁定。因此,当调用GetBuffer() 并且没有可用的完整缓冲区(因此_write_buffer_index == _read_buffer_index)时,它将始终阻塞,直到新缓冲区可用。我只是希望它跳过一个缓冲区(通过同时增加write_buffer_indexread_buffer_index),但我不确定另一个线程中的lock(_mutexes[_read_buffer_index]) 是否会更改它阻塞的互斥锁,因为索引变量已更改。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-08
    • 2014-10-09
    相关资源
    最近更新 更多