【问题标题】:Why are these Rx sequences out of order?为什么这些 Rx 序列乱序?
【发布时间】:2017-03-10 09:52:13
【问题描述】:

假设我写

var gen = Observable.Range(1, 3)
  .SelectMany(x => Observable.Range(1, x));

生成的序列是1 1 2 1 2 3,正如预期的那样。但是现在如果我写

var gen = Observable.Range(1, 4)
  .SelectMany(x => Observable.Range(1, x));

现在产生的序列是1 1 2 1 2 1 3 2 3 4,而不是预期的1 1 2 1 2 3 1 2 3 4。这是为什么? SelectMany() 会做某种多线程合并吗?

【问题讨论】:

  • 也许你应该使用不同的调度器:Observable.Range(1, 4, ImmediateScheduler.Instance).SelectMany(x => Observable.Range(1, x, ImmediateScheduler.Instance)) 按顺序创建序列
  • SelectMany 对自己的调度没有任何作用。默认情况下,.Range 使用 CurrentThreadScheduler,它将操作排队,但不是多线程的。一般来说,跨序列的 Rx 不会是“有序的”,除非你通过操作使它们如此——这正是它们是可观察的而不是可枚举的原因。
  • 那么Observable.Range()不能保证按顺序生成事件吗?
  • @JeroenMostert 不,Rx 中的序列肯定会按顺序排列,除非您对它们进行dis排序。可观察对象和可枚举对象之间的区别在于推与拉。可枚举对象在客户端请求时从源中获取数据,而可观察对象则在准备好时将数据推送给客户端。
  • @BrandonKramer:单个序列是有序的。一旦您开始组合单独的序列,您就需要密切关注正在发生的事情——虽然有顺序,但它不需要是直观的,甚至不需要确定性(如果确实涉及到线程调度程序) )。我就是这个意思。

标签: c# .net system.reactive reactivex rx.net


【解决方案1】:

Observable.Range() 本身将始终按顺序生成其事件。但是SelectMany() 不会等待前一个可观察对象完成后再开始下一个观察对象。这意味着随着序列变长,重叠会越来越多,因为下一个序列将在前一个序列完成之前开始,以此类推。

如果您试图让输出是连续的,那么您将需要使用不同的方法来展平序列,例如Concat()

例如:

var gen = Observable.Range(1, 4)
  .Select(x => Observable.Range(1, x)).Concat();

输出:1,1,2,1,2,3,1,2,3,4

SelectMany() 不同,Concat() 等待每个序列完成后再开始下一个序列。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-20
    • 2018-06-04
    • 2014-08-23
    • 2012-07-31
    相关资源
    最近更新 更多