考虑您需要从集合中获得什么,然后从那里开始。
当您需要某种集合时,您应该考虑以下几个问题:
您需要随机访问集合中的项目吗?
集合会被多个线程访问吗?
集合中的数据读取后是否需要保留?
订购重要吗?如果是这样,什么顺序 - 添加顺序,反向添加顺序,通过一些比较来排序?
对于这种情况下的输出缓冲区,答案是否、是、否和是:添加顺序。这几乎挑出了ConcurrentQueue 类。这允许您从一个或多个源添加对象,这些源不需要与将它们读回的代码位于同一线程中。它不会让您随意索引您似乎不需要的集合(好吧,反正不是直接索引)。
我会为输入缓冲区使用相同的类型,使用“当前块”缓冲区来保存最近读取的缓冲区,并使用一些简单的对象锁定语义来处理任何线程问题。
输出部分如下所示:
// Output buffer
private readonly ConcurrentQueue<byte[]> _outputBuffer = new ConcurrentQueue<byte[]>();
public override void Write(byte[] buffer, int offset, int count)
{
// Copy written data to new buffer and add to output queue
byte[] data = new byte[count];
Buffer.BlockCopy(buffer, offset, data, 0, count);
_outputBuffer.Enqueue(data);
}
public override void Flush()
{
// pull everything out of the queue and send to wherever it is going
byte[] curr;
while (_outputBuffer.TryDequeue(out curr))
internalSendData(curr);
}
internalSendData 方法是数据随后发送到网络的地方。
读缓冲有点复杂:
// collection to hold unread input data
private readonly ConcurrentQueue<byte[]> _inputBuffer = new ConcurrentQueue<byte[]>();
// current data block being read from
private byte[] _inputCurrent = null;
// read offset in current block
private short _inputPos = 0;
// object for locking access to the above.
private readonly object _inputLock = new object();
public override int Read(byte[] buffer, int offset, int count)
{
int readCount = 0;
lock(_inputLock)
{
while (count > 0)
{
if (_inputCurrent == null || _inputCurrent.Length <= _inputPos)
{
// read next block from input buffer
if (!_inputBuffer.TryDequeue(out _inputCurrent))
break;
_inputPos = 0;
}
// copy bytes to destination
int nBytes = Math.Min(count, _inputCurrent.Length - _inputPos);
Buffer.BlockCopy(_inputCurrent, _inputPos, buffer, offset, nBytes);
// adjust all the offsets and counters
readCount += nBytes;
offset += nBytes;
count -= nBytes;
_inputPos += (short)nBytes;
}
}
return readCount;
}
希望这是有道理的。
将队列用于这种软缓冲意味着数据仅在延迟发送或读取时才会保存在内存中。一旦你调用Flush,输出缓冲区的内存就会被释放用于垃圾收集,所以你不必担心内存爆裂,除非你试图发送比实际传输机制可以处理的快得多的速度。但是,如果您每秒排队数兆字节的数据通过 ADSL 连接发送出去,那么没有什么可以拯救您:P
我会在上面添加一些改进,例如一些检查以确保一旦缓冲区处于合理水平时自动调用 Flush。