【问题标题】:Implementing a custom stream实现自定义流
【发布时间】:2015-10-18 22:06:28
【问题描述】:

首先我应该提到我没有可用的内部Stream 对象。而不是那个,我确实有这个对象:

public interface IChannel
{
    void Send(byte[] data);
    event EventHandler<byte[]> Receive;
}

我想实现一个 Stream 类,像这样:

public class ChannelStream : Stream
{
    private readonly IChannel _channel;

    public ChannelStream(IChannel channel)
    {
        this._channel = channel;
    }

    // TODO: Implement Stream class
}

我需要的功能与NetworkStream:
非常相似 将字节写入我的流应将这些字节添加到缓冲区并在调用 Flush() 后调用 _channel.Send
Stream 还将侦听_channel.Receive 事件并将字节添加到另一个内部缓冲区,直到从流中读取它们。如果 Stream 没有任何可用数据,它应该阻塞,直到有新数据可用。

然而,我正在努力实现。我在内部尝试过使用两个MemoryStreams,但这会导致缓冲区继续吃越来越多的内存。

我可以使用哪种集合/流来实现我的流?

【问题讨论】:

  • BlockingCollection 可能是一个有用的技巧。

标签: c# .net stream


【解决方案1】:

考虑您需要从集合中获得什么,然后从那里开始。

当您需要某种集合时,您应该考虑以下几个问题:

  1. 您需要随机访问集合中的项目吗?

  2. 集合会被多个线程访问吗?

  3. 集合中的数据读取后是否需要保留?

  4. 订购重要吗?如果是这样,什么顺序 - 添加顺序,反向添加顺序,通过一些比较来排序?

对于这种情况下的输出缓冲区,答案是否、是、否和是:添加顺序。这几乎挑出了ConcurrentQueue 类。这允许您从一个或多个源添加对象,这些源不需要与将它们读回的代码位于同一线程中。它不会让您随意索引您似乎不需要的集合(好吧,反正不是直接索引)。

我会为输入缓冲区使用相同的类型,使用“当前块”缓冲区来保存最近读取的缓冲区,并使用一些简单的对象锁定语义来处理任何线程问题。

输出部分如下所示:

// Output buffer
private readonly ConcurrentQueue<byte[]> _outputBuffer = new ConcurrentQueue<byte[]>();

public override void Write(byte[] buffer, int offset, int count)
{
    // Copy written data to new buffer and add to output queue
    byte[] data = new byte[count];
    Buffer.BlockCopy(buffer, offset, data, 0, count);
    _outputBuffer.Enqueue(data);
}

public override void Flush()
{
    // pull everything out of the queue and send to wherever it is going
    byte[] curr;
    while (_outputBuffer.TryDequeue(out curr))
        internalSendData(curr);
}

internalSendData 方法是数据随后发送到网络的地方。

读缓冲有点复杂:

// collection to hold unread input data
private readonly ConcurrentQueue<byte[]> _inputBuffer = new ConcurrentQueue<byte[]>();
// current data block being read from
private byte[] _inputCurrent = null;
// read offset in current block
private short _inputPos = 0;
// object for locking access to the above.
private readonly object _inputLock = new object();

public override int Read(byte[] buffer, int offset, int count)
{
    int readCount = 0;
    lock(_inputLock)
    {
        while (count > 0)
        {
            if (_inputCurrent == null || _inputCurrent.Length <= _inputPos)
            {
                // read next block from input buffer
                if (!_inputBuffer.TryDequeue(out _inputCurrent))
                    break;

                _inputPos = 0;
            }

            // copy bytes to destination
            int nBytes = Math.Min(count, _inputCurrent.Length - _inputPos);
            Buffer.BlockCopy(_inputCurrent, _inputPos, buffer, offset, nBytes);

            // adjust all the offsets and counters
            readCount += nBytes;
            offset += nBytes;
            count -= nBytes;
            _inputPos += (short)nBytes;
        }
    }
    return readCount;
}

希望这是有道理的。

将队列用于这种软缓冲意味着数据仅在延迟发送或读取时才会保存在内存中。一旦你调用Flush,输出缓冲区的内存就会被释放用于垃圾收集,所以你不必担心内存爆裂,除非你试图发送比实际传输机制可以处理的快得多的速度。但是,如果您每秒排队数兆字节的数据通过 ADSL 连接发送出去,那么没有什么可以拯救您:P

我会在上面添加一些改进,例如一些检查以确保一旦缓冲区处于合理水平时自动调用 Flush

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-10-23
    • 2014-02-01
    • 2014-10-14
    • 2011-06-04
    • 2010-10-29
    • 2011-01-10
    相关资源
    最近更新 更多