【问题标题】:How can I make an IObservable from a queue, so that the sequence doesn't end when the queue is empty?如何从队列中创建 IObservable,以便队列为空时序列不会结束?
【发布时间】:2023-03-06 22:36:01
【问题描述】:

我正在研究使用 Reactive Extensions for .NET (Rx) 的东西,我想要一个从队列(或类似队列)获取输入的序列。

我试过这样做:

    static readonly Queue<DeviceTransaction> TransactionQueue = new Queue<DeviceTransaction>();
    //...
    var observableTransactionSource = TransactionQueue.ToObservable();
    //...
    observableTransactionSource.Subscribe(transactionObserver);

它工作到一定程度,但当队列为空时,序列完成。我不希望空队列结束序列。空并不意味着结束,它只是意味着'此刻没有更多'。

有没有办法在队列为空时停止序列完成,或者我应该以不同的方式考虑整个问题?

【问题讨论】:

  • 您需要使用与 Queue 不同的集合类型,它会在添加项目时触发事件,例如 ObservableCollection 或让您等到添加项目,例如BlockingCollection(尽管在您的情况下您希望它是异步的,而不是同步的,因此 BC 并不理想)。

标签: c# system.reactive reactive-programming


【解决方案1】:

调用ToObservable() 充满了问题,正如我解释的here 它只会使用IEnumerable&lt;T&gt; 并使用队列的快照。

在这种情况下,您最好使用Subject&lt;T&gt; 来支持您的活动。由于 Rx 语法规定您必须序列化事件的传递,它已经具有排队语义。只需在主题上致电OnNext&lt;T&gt; 即可发布活动。

如果您需要确保在事件发布后发生的订阅不会错过事件,请使用ReplaySubject&lt;T&gt;

如果您对使用某个主题感兴趣,那么您可能需要查看this blog post。总而言之,您对队列的使用表明在这里使用主题是可以的,但您可能需要考虑是否可以使用像 Observable.FromEvent 这样的转换方法。

【讨论】:

  • 谢谢詹姆斯,那里有很多好的线索可以阅读和尝试。实际上,我并不依赖于队列的概念,因为有时我的组件的用户会想要阻止事务(其他时候不会)。我开始想到我可能需要制作一个主题。我会阅读您提供的链接,然后回复我最终是如何做到的......
  • Dave Sexton 的优秀博文... 这说明了这一点:“我什么时候应该使用一个主题?在没有任何直接外部资源的情况下,命令式和有状态地生成一个热门的 observable。”我相信这可以很好地澄清我的情况。
  • 最后,我创建了一个类,每次创建新项目时都会引发一个事件,并使用Observable.FromEventPattern() 来观察这一点。它似乎有效,但我不确定这是否能保证序列化;如果多个线程正在生成新项目会发生什么(因此该事件可能会在多个线程上触发)?
  • 如果您的事件可以同时引发,那确实会导致问题。有一个深思熟虑的设计决定是为了提高性能而不是到处添加对OnNext 的并发调用检查。阅读Subject.Synchronize() - 在其他平台上也称为serialize - 了解在这种情况下如何安全地使用主题 - 或考虑在您的活动中使用其他一些线程安全机制。
  • 该评论似乎与您的原始答案不一致,即“Rx 语法强制事件的序列化传递”。我当然依赖这种行为,这确实是我使用 Rx 的主要动机。可能是我误会了?
猜你喜欢
  • 1970-01-01
  • 2021-11-20
  • 1970-01-01
  • 2018-04-09
  • 2011-06-09
  • 2011-04-06
  • 2021-02-21
  • 2021-04-09
  • 1970-01-01
相关资源
最近更新 更多