【问题标题】:Rxjava: How to combine multiple observables without all observables completing?Rxjava:如何在没有完成所有可观察对象的情况下组合多个可观察对象?
【发布时间】:2018-01-10 22:55:58
【问题描述】:

我有多个 hot observables,它们可能会或可能不会发出项目。因此,我想组合 observables,然后在其中任何一个发出结果时处理结果,但如果其他 observables 在 item 处发出,它们应该一起处理。

例如。

observable1 = PublishSubject<>()  
observable2 = PublishSubject<>()

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)

应该发出:

(1, null) 
(2, null)
(2, "Test")
(3, "Test")

observable2 也有可能在 observable1 之前发出

CombineLatest 最接近我的需要,但只有在所有可观察对象都发出至少一项时才会发出结果。是否有一个反应式运算符?

【问题讨论】:

    标签: java rx-java reactive-programming rx-java2


    【解决方案1】:

    您可以将startWith 与每个源一起使用以提供初始值,或者将BehaviorSubject 与初始值一起使用,然后将combineLatest 应用于这些增强的Observables。但是,null 在 RxJava 2 中是不允许的,因此您必须在可观察元素类型中找到一个中性值。

    PublishSubject<Integer> observable1 = PublishSubject.create()  
    PublishSubject<String>  observable2 = PublishSubject.create()
    
    Observable.combineLates(
        observable1.startWith(-100000),
        observable2.startWith(""),
        (a, b) -> a + b
    )
    .subscribe(System.out::println)
    ;
    
    observable1.onNext(1)  
    observable1.onNext(2)  
    observable2.onNext("Test")  
    observable1.onNext(3)
    

    BehaviorSubject<Integer> observable1 = BehaviorSubject.createDefault(-10000)  
    BehaviorSubject<String>  observable2 = BehaviorSubject.createDefault("")
    
    Observable.combineLates(
        observable1,
        observable2,
        (a, b) -> a + b
    )
    .subscribe(System.out::println)
    ;
    
    observable1.onNext(1)  
    observable1.onNext(2)  
    observable2.onNext("Test")  
    observable1.onNext(3)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-04-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-09
      相关资源
      最近更新 更多