【问题标题】:How can I split and pipe multiple NAudio stream如何拆分和管道多个 NAudio 流
【发布时间】:2015-07-03 14:06:06
【问题描述】:

我有一个使用来自 Kinect 1、Kinect 2、麦克风或其他任何东西的输入音频流的 C# 项目。

waveIn.DataAvailable += (object sender, WaveInEventArgs e) => {
  lock(buffer){
    var pos = buffer.Position;
              buffer.Write(e.Buffer, 0, e.BytesRecorded);
              buffer.Position = pos;
  }
};

缓冲区变量是来自组件 A 的 Stream,它将由处理 Streams 的 SpeechRecognition 组件 B 处理。

我将添加新的组件 C、D、E,在 Streams 上工作以计算音高、检测声音、进行指纹识别或其他任何事情......

如何为组件 C、D、E 复制该 Stream?

  • 组件 A 发送事​​件“我有一个流做你想做的事”我不想通过事件“给我你的流”来反转逻辑

  • 我正在寻找一个“MultiStream”,它可以给我一个 Stream 实例并处理这项工作

组件 A

var MultiStream buffer = new MultiStream()
...
SendMyEventWith(buffer)

组件 B、C、D、E

public void HandleMyEvent(MultiStream buffer){
  var stream = buffer.GetNewStream();
  var engine = new EngineComponentB()
      engine.SetStream(stream);
}
  • MultiStream 必须是 Stream 才能包装 Write() 方法(因为 Stream 没有可用的数据机制)?
  • 如果 Stream 是组件 B 的 Dispose(),则 MultiStream 应将其从其数组中删除?
  • MultiStream 必须在 Read() 上引发异常以要求使用 GetNewStream()

编辑:Kinect 1 本身提供一个 Stream ... :-( 我应该使用 Thread 将其泵入 MultiStream 吗?

有人有那种 MultiStream 类吗?

谢谢

【问题讨论】:

    标签: c# stream naudio


    【解决方案1】:

    我不确定这是否是最好的方法,或者它是否比以前的答案更好,我不保证这段代码是完美的,但我编写的代码确实是你所要求的,因为这很有趣 - MultiStream 课程。

    您可以在此处找到该课程的代码:http://pastie.org/10289142

    用法示例:

    MultiStream ms = new MultiStream();
    
    Stream copy1 = ms.CloneStream();
    ms.Read( ... );
    
    Stream copy2 = ms.CloneStream();
    ms.Read( ... );
    

    copy1copy2 将在示例运行后包含相同的数据,并且在写入 MultiStream 时它们将继续更新。您可以单独读取、更新位置和处理克隆的流。如果处置,克隆的流将从MultiStream 中删除,处置Multistream 将关闭所有相关和克隆的流(如果不是您想要的行为,您可以更改它)。尝试写入克隆的流将引发不受支持的异常。

    【讨论】:

      【解决方案2】:

      不知何故,我认为流并不真正适合您尝试做的事情。您正在设置一种情况,即长期运行的程序将无缘无故地不断扩展数据需求。

      我建议使用 pub/sub 模型将接收到的音频数据发布给订阅者,最好使用多线程方法来最大限度地减少不良订阅者的影响。一些想法可以在here找到。

      我以前使用实现IObserver<byte[]> 并使用Queue<byte[]> 来存储样本块的处理器类完成此操作,直到进程线程为它们准备好。以下是基类:

      public abstract class BufferedObserver<T> : IObserver<T>, IDisposable
      {
          private object _lck = new object();
      
          private IDisposable _subscription = null;
          public bool Subscribed { get { return _subscription != null; } }
      
          private bool _completed = false;
          public bool Completed { get { return _completed; } }
      
          protected readonly Queue<T> _queue = new Queue<T>();
      
          protected bool DataAvailable { get { lock(_lck) { return _queue.Any(); } } }
          protected int AvailableCount { get { lock (_lck) { return _queue.Count; } } }
      
          protected BufferedObserver()
          {
          }
      
          protected BufferedObserver(IObservable<T> observable)
          {
              SubscribeTo(observable);
          }
      
          public virtual void Dispose()
          {
              if (_subscription != null)
              {
                  _subscription.Dispose();
                  _subscription = null;
              }
          }
      
          public void SubscribeTo(IObservable<T> observable)
          {
              if (_subscription != null)
                  _subscription.Dispose();
              _subscription = observable.Subscribe(this);
              _completed = false;
          }
      
          public virtual void OnCompleted()
          {
              _completed = true;
          }
      
          public virtual void OnError(Exception error)
          { }
      
          public virtual void OnNext(T value)
          {
              lock (_lck)
                  _queue.Enqueue(value);
          }
      
          protected bool GetNext(ref T buffer)
          {
              lock (_lck)
              {
                  if (!_queue.Any())
                      return false;
                  buffer = _queue.Dequeue();
                  return true;
              }
          }
      
          protected T NextOrDefault()
          {
              T buffer = default(T);
              GetNext(ref buffer);
              return buffer;
          }
      }
      
      public abstract class Processor<T> : BufferedObserver<T>
      {
          private object _lck = new object();
          private Thread _thread = null;
      
          private object _cancel_lck = new object();
          private bool _cancel_requested = false;
          private bool CancelRequested
          {
              get { lock(_cancel_lck) return _cancel_requested; }
              set { lock(_cancel_lck) _cancel_requested = value; }
          }
      
          public bool Running { get { return _thread == null ? false : _thread.IsAlive; } }
          public bool Finished { get { return _thread == null ? false : !_thread.IsAlive; } }
      
          protected Processor(IObservable<T> observable)
              : base(observable)
          { }
      
          public override void Dispose()
          {
              if (_thread != null && _thread.IsAlive)
              {
                  //CancelRequested = true;
                  _thread.Join(5000);
              }
              base.Dispose();
          }
      
          public bool Start()
          {
              if (_thread != null)
                  return false;
      
              _thread = new Thread(threadfunc);
              _thread.Start();
              return true;
          }
      
          private void threadfunc()
          {
              while (!CancelRequested && (!Completed || _queue.Any()))
              {
                  if (DataAvailable)
                  {
                      T data = NextOrDefault();
                      if (data != null && !data.Equals(default(T)))
                          ProcessData(data);
                  }
                  else
                      Thread.Sleep(10);
              }
          }
      
          // implement this in a sub-class to process the blocks
          protected abstract void ProcessData(T data);
      }
      

      这样,您只需在需要时保留数据,并且您可以将任意数量的进程线程附加到同一个可观察数据源。


      为了完整起见,这里有一个实现IObservable&lt;T&gt; 的通用类,因此您可以看到它是如何组合在一起的。这个甚至有 cmets:

      /// <summary>Generic IObservable implementation</summary>
      /// <typeparam name="T">Type of messages being observed</typeparam>
      public class Observable<T> : IObservable<T>
      {
          /// <summary>Subscription class to manage unsubscription of observers.</summary>
          private class Subscription : IDisposable
          {
              /// <summary>Observer list that this subscription relates to</summary>
              public readonly ConcurrentBag<IObserver<T>> _observers;
      
              /// <summary>Observer to manage</summary>
              public readonly IObserver<T> _observer;
      
              /// <summary>Initialize subscription</summary>
              /// <param name="observers">List of subscribed observers to unsubscribe from</param>
              /// <param name="observer">Observer to manage</param>
              public Subscription(ConcurrentBag<IObserver<T>> observers, IObserver<T> observer)
              {
                  _observers = observers;
                  _observer = observer;
              }
      
              /// <summary>On disposal remove the subscriber from the subscription list</summary>
              public void Dispose()
              {
                  IObserver<T> observer;
                  if (_observers != null && _observers.Contains(_observer))
                      _observers.TryTake(out observer);
              }
          }
      
          // list of subscribed observers
          private readonly ConcurrentBag<IObserver<T>> _observers = new ConcurrentBag<IObserver<T>>();
      
          /// <summary>Subscribe an observer to this observable</summary>
          /// <param name="observer">Observer instance to subscribe</param>
          /// <returns>A subscription object that unsubscribes on destruction</returns>
          /// <remarks>Always returns a subscription.  Ensure that previous subscriptions are disposed
          /// before re-subscribing.</remarks>
          public IDisposable Subscribe(IObserver<T> observer)
          {
              // only add observer if it doesn't already exist:
              if (!_observers.Contains(observer))
                  _observers.Add(observer);
      
              // ...but always return a new subscription.
              return new Subscription(_observers, observer);
          }
      
          // delegate type for threaded invocation of IObserver.OnNext method
          private delegate void delNext(T value);
      
          /// <summary>Send <paramref name="data"/> to the OnNext methods of each subscriber</summary>
          /// <param name="data">Data object to send to subscribers</param>
          /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks>
          public void Notify(T data)
          {
              foreach (var observer in _observers)
              {
                  delNext handler = observer.OnNext;
                  handler.BeginInvoke(data, null, null);
              }
          }
      
          // delegate type for asynchronous invocation of IObserver.OnComplete method
          private delegate void delComplete();
      
          /// <summary>Notify all subscribers that the observable has completed</summary>
          /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks>
          public void NotifyComplete()
          {
              foreach (var observer in _observers)
              {
                  delComplete handler = observer.OnCompleted;
                  handler.BeginInvoke(null, null);
              }
          }
      }
      

      现在您可以创建一个Observable&lt;byte[]&gt; 用作您感兴趣的Process&lt;byte[]&gt; 实例的发送器。从输入流、音频阅读器等中提取数据块并将它们传递给 Notify 方法。只要确保你事先克隆了数组......

      【讨论】:

      • 在 Pub/Sub 观察者中,我将使用线程将数据从流写入另一个流?这可能会解决 Kinect 1 的问题,即提供可以泵入内存流的 Stream。
      • 但它不能解决 MultiStream 问题。一方面,我在组件 A 中有一个 Stream,另一方面,我有 B、C、D、E 读者想要阅读完整的流。所以这不是锁定问题,而是我尝试解决的“读取”光标问题
      • IObservable&lt;T&gt;Notify 方法中,您将收到的信息发布给每个订阅者,因此每个订阅者都会获得要处理的数据的完整副本。如果您需要在每个观察者中保留数据的历史记录,那么内存使用量可能会有点过大……我想这是一种权衡。如果您真的想可以在不同订阅者之间共享一个流,每个订阅者都有一个流光标对象来处理他们需要的独立定位。写起来会很有趣,但是试试 Pub/Sub。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-07
      相关资源
      最近更新 更多