【问题标题】:How to use RxJava combineLatest operator with more than 9 observables如何将 RxJava combineLatest 运算符与超过 9 个 observable 一起使用
【发布时间】:2016-08-07 07:25:29
【问题描述】:

我正在使用 RxJava,我想使用运算符 combineLatest 组合 12 个不同的 observables。

我看到了一个函数原型,其中包含一个可观察对象列表和一个 FuncN 的实现,但我不知道该怎么做,我在实现 call 方法时遇到了麻烦。

谁能给我举个例子?

【问题讨论】:

  • 请使用``格式化您的代码和函数

标签: java rx-java


【解决方案1】:

有一个 combineLatest 接受 List 的 observables。这是一个如何使用它的示例:

List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
    @Override
    public String call(Object... args) {
        String concat = "";
        for (Object value : args) {
            if (value instanceof Integer) {
                concat += (Integer) value;
            } else if (value instanceof String) {
                concat += (String) value;
            }
        }
        return concat;
    }
});

【讨论】:

  • 如果我有不同的可观察类型怎么办? (例如 Observable、Obsevable 等)
  • @user3762200 args 中的值将具有与list 中相同的顺序。您可以依赖此顺序或检查每个值的类型。取决于您的用例。我已经编辑了我的答案。
  • 谢谢,对我很有帮助
  • FuncN&lt;String&gt;() 是如何定义的?我似乎无法调用它。
【解决方案2】:

你扩展那个答案,我用它来一次读取多个特征,可以这样做:

connectionObservable
                .flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
                    List<Observable<?>> list1 = Arrays.asList(
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...));
                    return Observable.combineLatest(list1, args -> {
                       Object o =  doSomethingWithResults(args);
                        return o;
                    });
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnUnsubscribe(this::clearConnectionSubscription)
                .subscribe(retVal -> {
                    Log.d(TAG, "result:" + retVal.toString());
                    Log.w(TAG, "SUCCESS");
                    triggerDisconnect();

                }, MyActivity.this::onReadFailure);
    }

如果您对如何改进此过程有任何建议,请发表评论。

【讨论】:

  • 当这些特征之一引发 onError 时会发生什么?
  • 它转到 onReadFailure,在我的情况下,如果一个失败,我想扔掉整个阅读,所以它对我有用。
  • 将每一行 rxBleConnection.readCharacteristic(UUID...), 替换为 rxBleConnection.readCharacteristic(UUID...).onErrorResumeNext { bytes -&gt; Observable.just(new byte[0])) }, 本质上,如果未找到某个特征,您将返回一个空字节数组。代码将继续
【解决方案3】:

RxKotlin 在 combineLatest() 方法的参数中最多支持 9 个运算符,但要使用超过 9 个参数 表示传递无限的动态自定义对象数组列表,您可以按如下方式使用它:

首先让我给你一个简单的例子,只有两个参数 自定义数据类型

val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
                .subscribe({
                    Log.d("combineLatest", "onNext - ${it}")
                })

现在如果我想在 combineLatest 中传递多个参数怎么办?然后 您的答案如下:(我使用了自定义数据类型,所以有人 自定义问题也可以在这里解决)

val myList = arrayOf(Observable.just("MyName"),
                Observable.just(2),
                Observable.just(3.55),
                Observable.just("My Another String"),
                Observable.just(5),
                Observable.just(6),
                Observable.just(7),
                Observable.just(8),
                Observable.just(9),
                Observable.just(10),
                Observable.just(11),
                Observable.just(12),
                Observable.just(13),
                Observable.just(14),
                Observable.just(15))

Observable.combineLatest(myList, {
    val a = it[0] as String
    val b = it[1] as Int
    val c = it[2] as Float
    val d = it[3] as String
    val e = it[4] as Int
    val f = it[5] as Int
    val g = it[6] as Int
    val h = it[7] as Int
    val i = it[8] as Int
    val j = it[9] as Int
    val k = it[10] as Int
    val l = it[11] as Int
    val m = it[12] as Int
    "$a - age:${b}" })
        .subscribe({
            Log.d("combineLatest", "onNext - ${it}")
        })

【讨论】:

  • 你如何处理未经检查的选角?如果我列表中的所有可观察对象都相同,我知道类型是什么,但它仍然会为未经检查的强制转换发出警告
【解决方案4】:

如果你有 10 个 combineLatest 的源,这里是一个简单的 RxKotlin 扩展函数。您可以轻松地为更多源创建类似的函数,或者对其进行调整以使用普通的 RxJava。


import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables

@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
    source1: Observable<T1>, source2: Observable<T2>,
    source3: Observable<T3>, source4: Observable<T4>,
    source5: Observable<T5>, source6: Observable<T6>,
    source7: Observable<T7>, source8: Observable<T8>,
    source9: Observable<T9>, source10: Observable<T10>,
    crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
    Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
        combineFunction(
            it[0] as T1,
            it[1] as T2,
            it[2] as T3,
            it[3] as T4,
            it[4] as T5,
            it[5] as T6,
            it[6] as T7,
            it[7] as T8,
            it[8] as T9,
            it[9] as T10
        )
    }

注意:我将其创建为扩展函数,以与少于 10 个源 (Observables.combineLatest(...)) 的 combineLatest 函数调用的外观保持一致。这样我就不必考虑我需要哪个combineLatest 版本来获得多少参数。从技术上讲,没有必要将其作为扩展函数。

【讨论】:

    【解决方案5】:

    为了扩展 Egor Neliuba 的答案,您可以在一个容器对象中聚合所有结果,然后在 subscribe 子句中使用它:

     List<Observable<?>> list = new ArrayList<>();
        list.add(mCreateMarkupFlowManager.getFlowState());
        list.add(mCreateIssueFlowStateManager.getIssueFlowState());
        list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
        list.add(mViewerStateManager.getIssueLoadingProgressChanges());
        list.add(mMeasurementFlowStateManager.getFlowState());
        list.add(mViewerStateManager.isSheetLoaded());
        list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
        list.add(mViewerStateManager.getMarkupViewMode());
        list.add(mViewerStateManager.isFirstPerson());
        list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
        list.add(mCreateRfiFlowStateManager.getRfiFlowState());
    
        attachSubscription(Observable.combineLatest(list, args -> {
                    Holder holder = new Holder();
                    holder.setFirst((String) args[0]);
                    holder.setSecond((Integer) args[1]);
                    holder.setThird((Boolean) args[2]);
                    holder.setFourth((Boolean) args[3]);
                    holder.setFifth((String) args[4]);
                    holder.setSixth((Boolean) args[5]);
                    holder.setSeventh((Boolean) args[6]);
                    holder.setEighth((Boolean) args[7]);
                    holder.setNinth((Boolean) args[8]);
                    holder.setTenth((Boolean) args[9]);
                    holder.setEleventh((String) args[10]);
                    return holder;
                })
                        .filter(holder -> Util.isTrue(holder.sixth))
                        .compose(Util.applySchedulers())
                        .subscribe(holder -> {
                            if (isViewAttached()) {
                                String createMarkupState = holder.first;
                                Integer createIssueState = holder.second;
                                boolean markupsLoadingFinished = holder.third;
                                boolean issuesLoadingFinished = holder.fourth;
                                boolean loadingFinished = markupsLoadingFinished && issuesLoadingFinished;
                                String measurementState = holder.fifth;
                                boolean isMarkupLockMode = holder.eighth;
    
                                boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
                                boolean showCreateMeasureButton = shouldShowMeasureButton();
                                boolean showCreateFieldIssueButton = holder.seventh;
                                boolean isFirstPersonEnabled = holder.ninth;
                                Boolean showCreateRfiButton = holder.tenth;
                                String rfiFlowState = holder.eleventh;
    
    
                            }
                        })
        );
    
    
    public class Holder {
    
    
    public String first;
    public Integer second;
    public Boolean third;
    public Boolean fourth;
    public String fifth;
    public Boolean sixth;
    public Boolean seventh;
    public Boolean eighth;
    public Boolean ninth;
    public Boolean tenth;
    public String eleventh;
    
    public void setEleventh(String eleventh) {
        this.eleventh = eleventh;
    }
    
    
    public void setFirst(String first) {
        this.first = first;
    }
    
    
    public void setSecond(Integer second) {
        this.second = second;
    }
    
    
    public void setThird(Boolean third) {
        this.third = third;
    }
    
    
    public void setFourth(Boolean fourth) {
        this.fourth = fourth;
    }
    
    
    public void setFifth(String fifth) {
        this.fifth = fifth;
    }
    
    
    public void setSixth(Boolean sixth) {
        this.sixth = sixth;
    }
    
    
    public void setSeventh(Boolean seventh) {
        this.seventh = seventh;
    }
    
    
    public void setEighth(Boolean eighth) {
        this.eighth = eighth;
    }
    
    
    public void setNinth(Boolean ninth) {
        this.ninth = ninth;
    }
    
    
    public void setTenth(Boolean tenth) {
        this.tenth = tenth;
    }
    
    
    public Holder() {}
    
    }
    

    【讨论】:

      猜你喜欢
      • 2018-11-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-28
      相关资源
      最近更新 更多