【问题标题】:FIFO/Queue buffer specialising in byte streams专门用于字节流的 FIFO/队列缓冲区
【发布时间】:2020-07-11 07:43:04
【问题描述】:

是否有任何 .NET 数据结构/类组合允许将字节数据附加到缓冲区的末尾,但所有的偷看和读取都是从头开始的,当我阅读时会缩短缓冲区?

MemoryStream 类似乎起到了部分作用,但我需要为读写维护单独的位置,并且它不会在读取数据后在开始时自动丢弃数据。

已在回复this question 时发布了一个答案,这基本上是我想要做的,但我更喜欢我可以在同一进程的不同组件中执行异步 I/O 的操作,就像正常的一样管道甚至网络流(我需要先过滤/处理数据)。

【问题讨论】:

  • 在读缓冲区内来回跳转有什么问题吗?
  • 只有我所说的并且必须跟踪它,而不是 NetworkStream 风格的阅读、阅读、阅读等。
  • 是否需要读写不同大小的数组? byte[] 的队列对你来说不够好吗?
  • 读取将是单个字节,直到我找到某些数据然后是各种长度的块。写入将是任意长度,具体取决于我收到的数据。

标签: c# .net .net-2.0 data-stream


【解决方案1】:

我将发布我曾经为工作中的一个项目编写的一些逻辑的精简副本。此版本的优点是它可以处理缓冲数据的链接列表,因此您不必在读取时缓存大量内存和/或复制内存。此外,它的线程安全且行为类似于网络流,即:当没有可用数据时读取时:等待直到有可用数据或超时。此外,当读取 x 个字节并且只有 y 个字节时,在读取所有字节后返回。我希望这会有所帮助!

    public class SlidingStream : Stream
{
    #region Other stream member implementations

    ...

    #endregion Other stream member implementations

    public SlidingStream()
    {
        ReadTimeout = -1;
    }

    private readonly object _writeSyncRoot = new object();
    private readonly object _readSyncRoot = new object();
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();

    public int ReadTimeout { get; set; }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (_dataAvailableResetEvent.Wait(ReadTimeout))
            throw new TimeoutException("No data available");

        lock (_readSyncRoot)
        {
            int currentCount = 0;
            int currentOffset = 0;

            while (currentCount != count)
            {
                ArraySegment<byte> segment = _pendingSegments.First.Value;
                _pendingSegments.RemoveFirst();

                int index = segment.Offset;
                for (; index < segment.Count; index++)
                {
                    if (currentOffset < offset)
                    {
                        currentOffset++;
                    }
                    else
                    {
                        buffer[currentCount] = segment.Array[index];
                        currentCount++;
                    }
                }

                if (currentCount == count)
                {
                    if (index < segment.Offset + segment.Count)
                    {
                        _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index));
                    }
                }

                if (_pendingSegments.Count == 0)
                {
                    _dataAvailableResetEvent.Reset();

                    return currentCount;
                }
            }

            return currentCount;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeSyncRoot)
        {
            byte[] copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.AddLast(new ArraySegment<byte>(copy));

            _dataAvailableResetEvent.Set();
        }   
    }
}

【讨论】:

  • 看起来不错,和我的方向一样。我今晚试试。
  • 在我看来,如果您在没有可用数据的情况下尝试读取数据,这将崩溃。
  • @svick - 完全正确,它只是一个草稿,没有参数验证等。manualResetEvent 的唯一原因是,我只是忘记在读取方法开始时等待它。现在修好了。感谢提醒
  • 有什么想法可以提示 EOS 优雅地将 0 返回给读者(而不是讨厌的超时异常)?
  • 这看起来很不错。我打算做这样的事情,但我先用谷歌搜索了它。 @Deanna你有没有在某个地方发表过你的衍生物?你能做到吗?很高兴看到你用它做了什么。异常处理等
【解决方案2】:

代码可能比接受的答案更简单。无需使用for 循环。:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}

【讨论】:

  • 这与链接问题基本相同,不满足异步或阻塞的需求。不过还是谢谢。
  • 好的,但是该代码不是那么优雅并且它不是线程安全的。你可以用 6 行而不是 16 行来完成。
【解决方案3】:

我尝试完善 Polity 的代码。这远非优化,但也许可以正常工作。

public class SlidingStream : Stream {
  public SlidingStream() {
    ReadTimeout = -1;
  }

  private readonly object ReadSync = new object();
  private readonly object WriteSync = new object();
  private readonly ConcurrentQueue<ArraySegment<byte>> PendingSegments
    = new ConcurrentQueue<ArraySegment<byte>>();
  private readonly ManualResetEventSlim DataAvailable = new ManualResetEventSlim(false);
  private ArraySegment<byte>? PartialSegment;

  public new int ReadTimeout;

  public override bool CanRead => true;

  public override bool CanSeek => false;

  public override bool CanWrite => true;

  public override long Length => throw new NotImplementedException();

  public override long Position {
    get => throw new NotImplementedException();
    set => throw new NotImplementedException();
  }

  private bool Closed;

  public override void Close() {
    Closed = true;
    DataAvailable.Set();
    base.Close();
  }

  public override int Read(byte[] buffer, int offset, int count) {
    int msStart = Environment.TickCount;

    lock (ReadSync) {
      int read = 0;

      while (read < count) {
        ArraySegment<byte>? seg = TryDequeue(msStart);
        if (seg == null) {
          return read;
        }

        ArraySegment<byte> segment = seg.GetValueOrDefault();
        int bite = Math.Min(count - read, segment.Count);
        if (bite < segment.Count) {
          PartialSegment = new ArraySegment<byte>(
            segment.Array,
            segment.Offset + bite,
            segment.Count - bite
          );
        }

        Array.Copy(segment.Array, segment.Offset, buffer, offset + read, bite);
        read += bite;
      }

      return read;
    }
  }

  private ArraySegment<byte>? TryDequeue(int msStart) {
    ArraySegment<byte>? ps = PartialSegment;
    if (ps.HasValue) {
      PartialSegment = null;
      return ps;
    }

    DataAvailable.Reset();

    ArraySegment<byte> segment;
    while (!PendingSegments.TryDequeue(out segment)) {
      if (Closed) {
        return null;
      }
      WaitDataOrTimeout(msStart);
    }

    return segment;
  }

  private void WaitDataOrTimeout(int msStart) {
    int timeout;
    if (ReadTimeout == -1) {
      timeout = -1;
    }
    else {
      timeout = msStart + ReadTimeout - Environment.TickCount;
    }

    if (!DataAvailable.Wait(timeout)) {
      throw new TimeoutException("No data available");
    }
  }

  public override void Write(byte[] buffer, int offset, int count) {
    lock (WriteSync) {
      byte[] copy = new byte[count];
      Array.Copy(buffer, offset, copy, 0, count);

      PendingSegments.Enqueue(new ArraySegment<byte>(copy));

      DataAvailable.Set();
    }
  }

  public override void Flush() {
    throw new NotImplementedException();
  }

  public override long Seek(long offset, SeekOrigin origin) {
    throw new NotImplementedException();
  }

  public override void SetLength(long value) {
    throw new NotImplementedException();
  }
}

【讨论】:

    【解决方案4】:

    这是一个使用SemaphoreSlim 通知的有希望的免费版本:

    public class SlidingStream : Stream
    {
        private readonly object _writeLock = new object();
        private readonly object _readLock = new object();
        private readonly ConcurrentQueue<byte[]> _pendingSegments = new ConcurrentQueue<byte[]>();
        private byte[] _extraSegment = null;
    
        private readonly SemaphoreSlim _smSegmentsAvailable = new SemaphoreSlim(0);
    
        public override void Write(byte[] buffer, int offset, int count)
        {
            lock (_writeLock)
            {
                var copy = new byte[count];
                Array.Copy(buffer, offset, copy, 0, count);
    
                _pendingSegments.Enqueue(copy);
                _smSegmentsAvailable.Release(1);
            }
        }
    
        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
        {
            Write(buffer, offset, count);
            return Task.CompletedTask;
        }
    
        public override int Read(byte[] buffer, int offset, int bytesToRead)
        {
            lock (_readLock)
            {
                var bytesRead = 0;
    
                while (bytesToRead > 0)
                {
                    byte[] segment;
    
                    if (_extraSegment != null)
                    {
                        segment = _extraSegment;
                        _extraSegment = null;
                    }
                    else
                    {
                        if (_smSegmentsAvailable.CurrentCount == 0 && bytesRead > 0)
                        {
                            return bytesRead;
                        }
    
                        _smSegmentsAvailable.Wait(_cancel);
    
                        if (!_pendingSegments.TryDequeue(out segment))
                        {
                            throw new InvalidOperationException("No segment found, despite semaphore");
                        }
                    }
    
                    var copyCount = Math.Min(bytesToRead, segment.Length);
                    Array.Copy(segment, 0, buffer, offset + bytesRead, copyCount);
                    bytesToRead -= copyCount;
                    bytesRead += copyCount;
    
                    var extraCount = segment.Length - copyCount;
                    if (extraCount > 0)
                    {
                        _extraSegment = new byte[extraCount];
                        Array.Copy(segment, copyCount, _extraSegment, 0, extraCount);
                    }
                }
    
                return bytesRead;
            }
        }
    
        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        {
            //could be extended here with a proper async read
            var result = Read(buffer, offset, count);
            return Task.FromResult(result);
        }
    
        protected override void Dispose(bool disposing)
        {
            _smSegmentsAvailable.Dispose();
            base.Dispose(disposing);
        }
    
        public override bool CanRead => true;
        public override bool CanSeek => false;
        public override bool CanWrite => true;
    
        public override long Seek(long offset, SeekOrigin origin)
            => throw new NotSupportedException();
    
        public override void SetLength(long value)
            => throw new NotSupportedException();
    
        public override void Flush() {}
    
        public override long Length => throw new NotSupportedException();
    
        public override long Position
        {
            get => throw new NotSupportedException();
            set => throw new NotSupportedException();
        }
    }
    

    【讨论】:

      【解决方案5】:

      一个较晚的答案,但自 2.0 版以来 .NET Framework 中就有 Queue 支持。使用ConcurrentQueue 进行线程安全操作。

      我创建了Stream 的以下实现,当字节可用时,它将ReadReadLines。不是最好的实现,但它应该可以完成这项工作。

      public class QueueStream : Stream
      {
          protected readonly ConcurrentQueue<byte> Queue = new ConcurrentQueue<byte>();
      
          public Task? DownloadTask { get; set; }
      
          public override bool CanRead => true;
      
          public override bool CanSeek => false;
      
          public override bool CanWrite => true;
      
          public override long Length => Queue.Count;
      
          public override long Position
          {
              get => 0;
              set => throw new NotImplementedException($"{nameof(QueueStream)} is not seekable");
          }
      
          public override void Flush()
          {
              Queue.Clear();
          }
      
          public override int Read(byte[] buffer, int offset, int count)
          {
              if (buffer == null)
              {
                  throw new ArgumentNullException(nameof(buffer));
              }
      
              if (buffer.Length < count)
              {
                  throw new Exception($"{nameof(buffer)} length is less that the specified {nameof(count)}");
              }
      
              var index = 0;
              var insertedCount = 0;
              while (Queue.TryDequeue(out var b) && insertedCount < count)
              {
                  if (index >= offset)
                  {
                      buffer[insertedCount++] = b;
                  }
      
                  index++;
              }
      
              return insertedCount;
          }
      
          public string ReadLines(int numberOfLines = 1)
          {
              var currentLine = 0;
              var stringBuilder = new StringBuilder();
      
              Func<bool> isFaulted = () => false;
              Func<bool> isCompleted = () => true;
      
              if (DownloadTask != null)
              {
                  isFaulted = () => DownloadTask.IsFaulted;
                  isCompleted = () => DownloadTask.IsCompleted;
              }
      
              while (!isFaulted() && !isCompleted() && currentLine < numberOfLines)
              {
                  if (!Queue.TryDequeue(out var byteResult))
                  {
                      continue;
                  }
      
                  if (byteResult == '\r' || byteResult == '\n')
                  {
                      if (byteResult == '\r')
                      {
                          byte peekResult = 0;
                          while (!isFaulted() && !isCompleted() && !Queue.TryPeek(out peekResult))
                          {
                          }
      
                          if (peekResult == '\n')
                          {
                              Queue.TryDequeue(out _);
                          }
                      }
      
                      stringBuilder.Append(Environment.NewLine);
                      currentLine++;
                      continue;
                  }
      
                  stringBuilder.Append((char)byteResult);
              }
      
              return stringBuilder.ToString();
          }
      
          public override long Seek(long offset, SeekOrigin origin)
          {
              throw new NotImplementedException();
          }
      
          public override void SetLength(long value)
          {
              throw new NotImplementedException();
          }
      
          public override void Write(byte[] buffer, int offset, int count)
          {
              var forEnd = offset + count;
              for (var index = offset; index < forEnd; index++)
              {
                  Queue.Enqueue(buffer[index]);
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2012-09-04
        • 2017-04-08
        • 1970-01-01
        • 2021-03-22
        • 2017-02-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多