【问题标题】:How to pipe what is written to Stream 1 into Stream 2?如何将写入 Stream 1 的内容通过管道传输到 Stream 2?
【发布时间】:2015-11-04 13:38:30
【问题描述】:

这是我的场景:

producer.WriteStream(stream);
consumer.ReadStream(stream);

我想要一些允许producer 生成的字节逐渐传输到consumer 的东西。

我可以将所有内容写入MemoryStream,然后倒带并在consumer 上读取,但这会导致大量内存消耗。

我怎样才能做到这一点?

【问题讨论】:

  • 使用PipeStream的2个实例,1个读取(客户端)和1个写入(服务器)。
  • 感谢@Amit,您能否详细说明如何将这些流“绑定”在一起.. 我不清楚。
  • 如果您需要将数据从一个流传输到另一个流,您通常通过从源读取块(例如 1K 或 4K)并将它们放入目标,直到源流为空。

标签: c# .net stream


【解决方案1】:

使用管道作为数据的底层传输,您可以拥有允许这种通信机制的“写入流”(服务器)和“读取流”(客户端)。

使用匿名管道或命名管道(如果您需要进程间通信)非常简单。要创建您的管道流:

AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
AnonymousPipeClientStream pipeClient =
  new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString());

现在你可以用这些来写和读了:

producer.WriteStream(pipeServer);
// somewhere else...
consumer.ReadStream(pipeClient);

【讨论】:

  • 这比我的解决方案容易得多。
【解决方案2】:

我只是为了好玩把它放在一起,它未经测试,可能有一些错误。您只需将ReaderStream 传递给读者,将WriterStream 传递给作者。

public class LoopbackStream
{
    public Stream ReaderStream { get; }
    public Stream WriterStream { get;}

    private readonly BlockingCollection<byte[]> _buffer;

    public LoopbackStream()
    {
        _buffer = new BlockingCollection<byte[]>();
        ReaderStream = new ReaderStreamInternal(_buffer);
        WriterStream = new WriterStreamInternal(_buffer);
    }

    private class WriterStreamInternal : Stream
    {
        private readonly BlockingCollection<byte[]> _buffer;

        public WriterStreamInternal(BlockingCollection<byte[]> buffer)
        {
            _buffer = buffer;
            CanRead = false;
            CanWrite = false;
            CanSeek = false;
        }

        public override void Close()
        {
            _buffer.CompleteAdding();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            var newData = new byte[count];
            Array.Copy(buffer, offset, newData, 0, count);
            _buffer.Add(newData);
        }

        public override void Flush()
        {
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override bool CanRead { get; }
        public override bool CanSeek { get; }
        public override bool CanWrite { get; }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    }
    private class ReaderStreamInternal : Stream
    {
        private readonly BlockingCollection<byte[]> _buffer;
        private readonly IEnumerator<byte[]> _readerEnumerator;
        private byte[] _currentBuffer;
        private int _currentBufferIndex = 0;

        public ReaderStreamInternal(BlockingCollection<byte[]> buffer)
        {
            _buffer = buffer;
            CanRead = true;
            CanWrite = false;
            CanSeek = false;
            _readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator();
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                _readerEnumerator.Dispose();
            }
            base.Dispose(disposing);
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            if (_currentBuffer == null)
            {
                bool read = _readerEnumerator.MoveNext();
                if (!read)
                    return 0;
                _currentBuffer = _readerEnumerator.Current;
            }

            var remainingBytes = _currentBuffer.Length - _currentBufferIndex;
            var readBytes = Math.Min(remainingBytes, count);
            Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes);
            _currentBufferIndex += readBytes;

            if (_currentBufferIndex == _currentBuffer.Length)
                _currentBuffer = null;

            return readBytes;
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        public override void Flush()
        {
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override bool CanRead { get; }
        public override bool CanSeek { get; }
        public override bool CanWrite { get; }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-02-17
    • 2018-06-04
    • 1970-01-01
    • 2021-10-15
    • 2021-05-11
    • 1970-01-01
    • 2011-01-15
    • 2014-02-03
    相关资源
    最近更新 更多