【问题标题】:Operator on RxJava Observable/Flowable to delay emission by n itemsRxJava Observable/Flowable 上的操作员将发射延迟 n 个项目
【发布时间】:2019-12-19 21:47:56
【问题描述】:

我想转换 Flowable 以便它推迟发射项目,直到收集到指定数量的项目,然后以 FIFO 顺序将它们发射到下游,保持恒定的延迟项目计数。一旦上游发出 onComplete 信号,排队的项目应该在发出 onComplete 之前刷新到下游:

(在本例中,延迟项目编号为 3)

1 2 3 4 5 6 7 |
      1 2 3 4 5 6 7 |

我没有看到任何现有的运营商这样做或可以修改以获得该行为。 Observable.delay 似乎只支持基于时间的延迟,不支持基于计数的延迟。

实现这一点应该很容易实现自定义运算符,但也许现有运算符有更简单的方法?

【问题讨论】:

    标签: java rx-java reactive-programming rx-java2


    【解决方案1】:

    您可以发布一个序列,跳过最后一个 N,然后将最后一个 N 追加回来:

    Flowable.range(1, 7)
        .flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(v))
    // -------------------------------------------------------------------
        .publish(f -> 
            f.skipLast(3).mergeWith(f.takeLast(3))
        )
    // -------------------------------------------------------------------
        .blockingSubscribe(v -> System.out.println("<-- " + v));
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多