【发布时间】:2016-12-20 09:46:21
【问题描述】:
我正在尝试编写一个缓冲区,它可以将数据推送到缓冲区,检查是否已满,必要时交换缓冲区。另一个线程可以获取文件输出的缓冲区。
我已经成功实现了缓冲区,但我想添加一个 ForceSwapBuffer 方法,该方法将强制交换不完整的缓冲区并从不完整的缓冲区返回数据。为了做到这一点,我检查读取和写入缓冲区是否相同(尝试强制交换缓冲区写入文件是没有用的,而仍有其他完整的缓冲区可以写入)。 我希望这个方法能够与 GetBuffer 方法一起运行(不是真的必要,但我想尝试一下,偶然发现了这个问题)。
GetBuffer 会阻塞,当 ForceSwapBuffer 完成时它仍然会阻塞,直到新缓冲区完全填满,因为在 ForceSwapBuffer 中我更改了原子 _read_buffer_index。我想知道这是否会一直有效? GetBuffer 的阻塞锁是否会检测到原子 read_buffer_index 的更改并更改它试图锁定的互斥锁,还是会在锁开始时检查它必须锁定的互斥锁并继续尝试锁定同一个互斥锁,即使索引变化?
/* selection of member data */
unsigned int _size, _count;
std::atomic<unsigned int> _write_buffer_index, _read_buffer_index;
unsigned int _index;
std::unique_ptr< std::unique_ptr<T[]>[] > _buffers;
std::unique_ptr< std::mutex[] > _mutexes;
std::recursive_mutex _force_swap_buffer;
/* selection of implementation of member functions */
template<typename T> // included to show the use of the recursive_mutex
void Buffer<T>::Push(T *data, unsigned int length) {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer);
if (_index + length <= _size) {
memcpy(&_buffers[_write_buffer_index][_index], data, length*sizeof(T));
_index += length;
} else {
memcpy(&_buffers[_write_buffer_index][_index], data, (_size - _index)*sizeof(T));
unsigned int t_index = _index;
SwapBuffer();
Push(&data[_size - t_index], length - (_size - t_index));
}
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::GetBuffer() {
std::lock_guard<std::mutex> lock(_mutexes[_read_buffer_index]); // where the magic should happen
std::unique_ptr<T[]> result(new T[_size]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _size*sizeof(T));
_read_buffer_index = (_read_buffer_index + 1) % _count;
return std::move(result);
}
template<typename T>
std::unique_ptr<T[]> Buffer<T>::ForceSwapBuffer() {
std::lock_guard<std::recursive_mutex> lock(_force_swap_buffer); // lock that forbids pushing and force swapping at the same time
if (_write_buffer_index != _read_buffer_index)
return nullptr;
std::unique_ptr<T[]> result(new T[_index]);
memcpy(result.get(), _buffers[_read_buffer_index].get(), _index*sizeof(T));
unsigned int next = (_write_buffer_index + 1) % _count;
_mutexes[next].lock();
_read_buffer_index = next; // changing the read_index while the other thread it blocked, the new mutex is already locked so the other thread should remain locked
_mutexes[_write_buffer_index].unlock();
_write_buffer_index = next;
_index = 0;
return result;
}
【问题讨论】:
标签: c++ arrays multithreading mutex atomic