【问题标题】:Combining Rx Singles into Observables recursively递归地将 Rx Singles 组合成 Observables
【发布时间】:2017-06-14 04:59:22
【问题描述】:

假设我有一个名为s_0Single,它可以从T 类型发出一个元素t_0,或者失败(在某些语言中是Single<T>)。那就是:

s_0: -- t_0          // Success

OR

s_0: -- X            // Failure

T 类型的实例有一个next() 方法,该方法也从T 类型返回一个可选的SingleKotlin 中的Single<T>?)。这种行为导致Single 实例链能够发出T 实例链,其中每个s_i 可以发出一个元素t_i 能够返回下一个s_i+1,这将发出一个元素t_i+1 以此类推,直到最后一个元素 t_n-1 不返回单曲或任何单曲失败:

s_0: -- t_0
        ↓
        s_1: -- t_1
                ↓
                s_2: -- t_2

                        ...

                        ↓
                        s_n-1: -- t_n-1
                                  ↓
                                  null

OR

s_0: -- t_0
        ↓
        s_1: -- t_1
                ↓
                s_2: -- t_2

                        ...

                        ↓
                        s_i: -- X

我正在寻找一种优雅的方法来从T 类型中获取Observable o 能够发射由s_0 开始的链中的所有元素,当有链上不再有单打或如果任何单打失败则失败:

o: -- t_0 -- t_1 -- t_2 -- ... -- t_n-1 --o     // Success

OR

o: -- t_0 -- t_1 -- t_2 -- ... --X              // Failure

优雅,我的意思是像这样简单的东西(在 Kotlin 中):

// Get single somehow (out of the scope of this question)
val s0: Single<T> = provideSingle()

// Get observable
val o: Observable<T> = s0.chain()

// Define extension method
fun Single<T>.chain(): Observable<T> {
    /*
    Implement here
     */
}

// Element interface
interface T {
    fun next(): Single<T>?
}

这有什么适用性?

在使用带有分页功能的 REST API 时会发现这种情况,其中 Single 实例可用于检索单个页面,而这些页面又可以提供能够发出后续页面的 Single 实例。

【问题讨论】:

  • 为什么投反对票?

标签: rx-java kotlin reactive-programming


【解决方案1】:

我尚未对此进行测试,但基于我前段时间针对类似分页问题编写的解决方案,我将其翻译为 Kotlin

fun Single<T>.chain(): Observable<T> =
    toObservable()
    .concatWith {
        it.next()?.chain()
        ?: Observable.empty()
    }

获得“递归”链接的关键是 concatWith 运算符递归调用 chain 方法

【讨论】:

  • 酷! concatMap + 递归是要走的路。我只想指出,在这种特定情况下,给定的代码将不起作用,因为发出的项目永远不会是null,它应该与返回的值递归连接,例如:concatMap { Observable.just(it).concatWith(it.next()?.chain(next) ?: Observable.empty()) }。但这只是一个小细节,您的回答为我指明了正确的方向。谢谢!
  • 我刚刚更新了我的答案以反映这一点。请注意,您的 chain() 方法不接受任何 next arg ;)
【解决方案2】:
public class Q44535765 {
  public static void main(String[] args) {
    Maybe<Element> first = get();
    first.toObservable()
        .compose(o -> chain(o))
        .doOnError(e -> System.out.println(e))
        .subscribe(
            e -> System.out.println(e),
            e -> System.out.println("fail"),
            () -> System.out.println("complete"));
  }

  static Maybe<Element> get() {
    return Maybe.just(
        () -> If.<Maybe<Element>> that(Math.random() > 0.1)
            .tobe(() -> get())
            .orbe(() -> If.<Maybe<Element>> that(Math.random() > 0.5)
                .tobe(() -> Maybe.empty())
                .orbe(() -> null)
                .result())
            .result());
  }

  static Observable<Element> chain(Observable<Element> s) {
    return s.concatMap(
        e -> Observable.just(e)
            .concatWith(e.next()
                .toObservable()
                .compose(o -> chain(o))));
  }

  interface Element {
    Maybe<Element> next();
  }
}

虽然 If 是我的 util 类,但您可以改为 if...else...

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-06-15
    • 1970-01-01
    • 1970-01-01
    • 2015-04-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多