【问题标题】:Combining Unknown number of Observables in RxJava在 RxJava 中组合未知数量的 Observable
【发布时间】:2016-12-21 23:19:33
【问题描述】:

我有一种情况,我必须根据请求创建 N Observable 对象。我知道我可以使用zip 来组合已知数量的 Observable。但是,我一直在试图了解如何组合未知数量的 Observable。

我不清楚的部分是传递给zip 的函数。根据 Observable 的数量,我必须创建一个带有 N 个参数的 lambda。

所有的 Observable 返回不同类型的对象。

【问题讨论】:

  • 请添加一些代码,这会稍微解释您的想法。目前还不清楚你想在这里实现什么。
  • 这些 observables 是什么,它们产生了多少项目?你看过zip(Observable<Observable<T>>, FuncN)重载了吗?
  • 谢谢@akarnokd。我看过拉链。它的问题是不确定有多少 observable 可用于 zip,因为请求决定了 observable 的数量。如果有 3 个 observables,我编写了一个接受 3 个参数的 lambda,如果有 N 个 observables,那么我编写了一个带有 N 个参数的 lambda.. 这是不可维护的,并且在运行时确定..
  • 由于您不知道要创建的可观察对象的数量,我建议使用PublishSubject。因此,每当您创建一个新的 observable 时,使用您自己的缓冲区或缓存机制发布它们并在那里执行操作..

标签: java jakarta-ee asynchronous concurrency rx-java


【解决方案1】:

更新:

我最终找到了解决并行请求列表的正常方法。 只需使用 flatMap、merge、zip、任何组合 rx 运算符。

我们唯一需要特别做的就是对每个请求使用 .subscribeOn(Schedulers.io()) 。所有其他事情,发送并行请求或同时发送将由 rxjava 完美安排。

如果你想看看效果,请尝试以下操作:

private void runMyTest() {
    List<Single<String>> singleObservableList = new ArrayList<>();
    singleObservableList.add(getSingleObservable(500, "AAA"));
    singleObservableList.add(getSingleObservable(300, "BBB"));
    singleObservableList.add(getSingleObservable(100, "CCC"));
    Single.merge(singleObservableList)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(System.out::println); }

private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
    return Single
            .create((SingleOnSubscribe<String>) e -> {
                    try {
                        Thread.sleep(waitMilliSeconds);
                    } catch (InterruptedException exception) {
                        exception.printStackTrace();
                    }
                    System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = "
+Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
                    if(!e.isDisposed()) e.onSuccess(name);
                })
            .subscribeOn(Schedulers.io()); }

输出:

System.out:名称 = CCC,waitMilliSeconds = 100,线程名称 = RxCachedThreadScheduler-4, id =463

System.out: CCC

System.out:名称 = BBB,waitMilliSeconds = 300,线程名称 = RxCachedThreadScheduler-3, id =462

System.out: BBB

System.out:名称 = AAA,waitMilliSeconds = 500,线程名称 = RxCachedThreadScheduler-2, id =461

System.out:AAA

// ******以前的答案但不准确************//

使用这个;解决了我的问题:

zip(java.lang.Iterable&lt;? extends Observable&lt;?&gt;&gt; ws,FuncN&lt;? extends R&gt; zipFunction) method.

一个样本:

public Observable<CombinedData> getCombinedObservables() {
        List<Observable> observableList = new ArrayList<>();

        observableList.add(observable1);
        observableList.add(observable2);
        observableList.add(observable3);
        observableList.add(observable4);

        return Observable.zip(observableList, new Function<Object[], CombinedData>() {
            @Override
            public CombinedData apply(Object[] objects) throws Exception {
                return new CombinedData(...);
            }
        });
    }

【讨论】:

    【解决方案2】:

    您可以让您的 Observables 返回常见类型的对象,将其合并并作为 List 处理:

    class Result1 implements Result
    class Result2 implements Result
    class REsult3 implements Result
    
    for(Observable o : yourObservableList)
        resultObservable.mergeWith(o) //use concat() if you need serial execution
    
    resultObservable
        .toList()
        .doOnNext(results -> {
            //process your results (List<Result>)
        }
        .subscribe(...)
    

    【讨论】:

    • 你甚至可以将 observable 列表传递给 merge 方法。
    • 对不起,我的错误。应该是mergeWith。我编辑了我的答案。
    【解决方案3】:

    当我不得不根据应用程序中的用户选择对不同数量的 EditTexts 进行表单验证时,我遇到了同样的问题。

    在我的具体情况下,所有添加的字段都必须包含内容。

    这最终对我有用。

            val ob1 = RxTextView.textChanges(field1).skip(1)
            val ob2 = RxTextView.textChanges(field2).skip(1)
            val ob3 = RxTextView.textChanges(field3).skip(1)
    
            val observableList = arrayListOf<Observable<CharSequence>>()
    
            observableList.add(ob1)
            observableList.add(ob3)
    
            val formValidator = Observable.combineLatest(observableList, {
                var isValid = true
                it.forEach {
                    val string = it.toString()
                    if (string.isEmpty()) {
                        isValid = false
                    }
                }
                return@combineLatest isValid
            })
    
            formValidator.subscribe { isValid ->
                if (isValid) {
                    //do something
                } else {
                    //do something
                }
            }
    

    【讨论】:

      猜你喜欢
      • 2018-08-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-06
      • 1970-01-01
      • 2021-12-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多