【问题标题】:Implementing sequence : List<Observable<T>> -> Observable<List<T>>实现顺序:List<Observable<T>> -> Observable<List<T>>
【发布时间】:2014-05-15 11:07:06
【问题描述】:

对于每个类型 T, 有一个包含 n 个 T 的 Observable 的列表,我想构建一个 Observable,每次原始 Observable 中的一个发出某些东西时发出一个 n T 的列表。 这在函数式文献中通常被称为“序列”运算符。

伪语法中的期望行为示例:

val o1 = BehaviourSubject.create(true)
val o2 = BehaviourSubject.create(false)
val listOfObservables = [o1,o2]

val observableOfList = sequence(listOfObservables)

observableOfList.subscribe(print)

o2.onNext(true)

// Expected output:
// [true, false]
// [true, true]

我在 java 中编写了以下幼稚的实现,但行为不端:

public static <T> Observable<List<T>> sequence(List<Observable<T>> from) {
  return fold(from, Observable.<List<T>>never().startWith(new ArrayList<T>()),
            (arrayListObservable, observable) -> {
    return Observable.combineLatest(arrayListObservable, observable, (ts, t) -> {
      ts.add(t);
      return ts;
    });
  });
}

public static <F, T> T fold(final Iterable<? extends F> elements, final T zero, final Func2<T, F, T> f) {
  T currentValue = zero;
  for (final F element : elements) {
    currentValue = f.call(currentValue, element);
  }
  return currentValue;
}

// Actual output
// [true, false]
// [true, false, true]

不知何故,我需要重建结果列表,而不是将新值附加到现有列表中。你们会怎么做?

感谢您的时间和未来的答案!

【问题讨论】:

    标签: functional-programming system.reactive reactive-programming rx-java


    【解决方案1】:

    Rxx 有一个 CombineLatest 的重载,它接受一个可观察对象的集合,并完全按照您的意愿行事。您应该能够相当轻松地将该方法的源代码移植到 Java。

    【讨论】:

    • 嗨,布兰登,感谢您的回答让我重新深入评估 rxjava 文档。你知道吗?最近的 0.18.x 版本确实提供了一个带数组的 CombineLatest!这是我如何使用它来实现序列 :: List> -> -> Observable> 等效:
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-24
    • 2011-10-09
    • 1970-01-01
    • 2020-03-07
    相关资源
    最近更新 更多