【问题标题】:How to broadcast a cold observable: Replay with back-pressure?如何广播一个冷的 observable:用背压重播?
【发布时间】:2017-07-25 11:22:27
【问题描述】:

我实际上使用的是 Scala,但这个问题对所有 Rx 和流框架都是通用的。

我的用例是我有一个生成的 observable(因此很冷),我希望多个消费者并行使用完全相同的值,但我希望它们具有显着不同的吞吐量。

我需要的可以通过广播一个带有重播的 observable 来完成,但是我看到使用最大缓冲区大小进行重播的常见策略是在溢出时从缓冲区中删除元素(然后对于最慢的消费者来说这些元素会丢失)向生产者施压。如果您将所有广播的 observables 都视为热的,这是有道理的,但是,就我而言,我知道它实际上是冷的并且可以被背压。

在任何 JVM 响应式流兼容框架中是否有某种方法可以实现这一点?

非常感谢!

【问题讨论】:

    标签: scala reactive-programming akka-stream rx-java2 monix


    【解决方案1】:

    RxJava 通过publish 操作符支持这一点,该操作符协调来自单个消费者的请求,也就是说,它以与最慢消费者请求一样快的固定速率请求。不幸的是,目前没有 RxScala 2,只有 RxJava 2 支持 Reactive-Streams 规范,因此,将其转换为 Scala 可能会有些不便:

    Flowable.fromPublisher(Flowable.range(1, 1000))
    .publish(f -> 
        Flowable.mergeArray(
            f.observeOn(Schedulers.computation()).map(v -> v * v),
            f.observeOn(Schedulers.computation()).map(v -> v * v * v)
        )
     )
     .blockingSubscribe(System.out::println);
    

    另一种方法是使用ConnectableObservable,并在所有消费者都订阅后手动连接:

    ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
        .publish();
    
    co.observeOn(Schedulers.computation()).map(v -> v * v)
      .subscribe(System.out::println);
    
    co.connect();
    

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-08
    • 1970-01-01
    • 2021-07-09
    • 2013-07-14
    • 2012-08-28
    • 2019-11-25
    相关资源
    最近更新 更多