【发布时间】:2016-08-07 07:25:29
【问题描述】:
我正在使用 RxJava,我想使用运算符 combineLatest 组合 12 个不同的 observables。
我看到了一个函数原型,其中包含一个可观察对象列表和一个 FuncN 的实现,但我不知道该怎么做,我在实现 call 方法时遇到了麻烦。
谁能给我举个例子?
【问题讨论】:
-
请使用
``格式化您的代码和函数
我正在使用 RxJava,我想使用运算符 combineLatest 组合 12 个不同的 observables。
我看到了一个函数原型,其中包含一个可观察对象列表和一个 FuncN 的实现,但我不知道该怎么做,我在实现 call 方法时遇到了麻烦。
谁能给我举个例子?
【问题讨论】:
``格式化您的代码和函数
有一个 combineLatest 接受 List 的 observables。这是一个如何使用它的示例:
List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
@Override
public String call(Object... args) {
String concat = "";
for (Object value : args) {
if (value instanceof Integer) {
concat += (Integer) value;
} else if (value instanceof String) {
concat += (String) value;
}
}
return concat;
}
});
【讨论】:
args 中的值将具有与list 中相同的顺序。您可以依赖此顺序或检查每个值的类型。取决于您的用例。我已经编辑了我的答案。
FuncN<String>() 是如何定义的?我似乎无法调用它。
你扩展那个答案,我用它来一次读取多个特征,可以这样做:
connectionObservable
.flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
List<Observable<?>> list1 = Arrays.asList(
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...));
return Observable.combineLatest(list1, args -> {
Object o = doSomethingWithResults(args);
return o;
});
})
.observeOn(AndroidSchedulers.mainThread())
.doOnUnsubscribe(this::clearConnectionSubscription)
.subscribe(retVal -> {
Log.d(TAG, "result:" + retVal.toString());
Log.w(TAG, "SUCCESS");
triggerDisconnect();
}, MyActivity.this::onReadFailure);
}
如果您对如何改进此过程有任何建议,请发表评论。
【讨论】:
rxBleConnection.readCharacteristic(UUID...), 替换为 rxBleConnection.readCharacteristic(UUID...).onErrorResumeNext { bytes -> Observable.just(new byte[0])) }, 本质上,如果未找到某个特征,您将返回一个空字节数组。代码将继续
RxKotlin 在 combineLatest() 方法的参数中最多支持 9 个运算符,但要使用超过 9 个参数 表示传递无限的动态自定义对象数组列表,您可以按如下方式使用它:
首先让我给你一个简单的例子,只有两个参数 自定义数据类型
val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
现在如果我想在 combineLatest 中传递多个参数怎么办?然后 您的答案如下:(我使用了自定义数据类型,所以有人 自定义问题也可以在这里解决)
val myList = arrayOf(Observable.just("MyName"),
Observable.just(2),
Observable.just(3.55),
Observable.just("My Another String"),
Observable.just(5),
Observable.just(6),
Observable.just(7),
Observable.just(8),
Observable.just(9),
Observable.just(10),
Observable.just(11),
Observable.just(12),
Observable.just(13),
Observable.just(14),
Observable.just(15))
Observable.combineLatest(myList, {
val a = it[0] as String
val b = it[1] as Int
val c = it[2] as Float
val d = it[3] as String
val e = it[4] as Int
val f = it[5] as Int
val g = it[6] as Int
val h = it[7] as Int
val i = it[8] as Int
val j = it[9] as Int
val k = it[10] as Int
val l = it[11] as Int
val m = it[12] as Int
"$a - age:${b}" })
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
【讨论】:
如果你有 10 个 combineLatest 的源,这里是一个简单的 RxKotlin 扩展函数。您可以轻松地为更多源创建类似的函数,或者对其进行调整以使用普通的 RxJava。
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables
@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
source1: Observable<T1>, source2: Observable<T2>,
source3: Observable<T3>, source4: Observable<T4>,
source5: Observable<T5>, source6: Observable<T6>,
source7: Observable<T7>, source8: Observable<T8>,
source9: Observable<T9>, source10: Observable<T10>,
crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
combineFunction(
it[0] as T1,
it[1] as T2,
it[2] as T3,
it[3] as T4,
it[4] as T5,
it[5] as T6,
it[6] as T7,
it[7] as T8,
it[8] as T9,
it[9] as T10
)
}
注意:我将其创建为扩展函数,以与少于 10 个源 (Observables.combineLatest(...)) 的 combineLatest 函数调用的外观保持一致。这样我就不必考虑我需要哪个combineLatest 版本来获得多少参数。从技术上讲,没有必要将其作为扩展函数。
【讨论】:
为了扩展 Egor Neliuba 的答案,您可以在一个容器对象中聚合所有结果,然后在 subscribe 子句中使用它:
List<Observable<?>> list = new ArrayList<>();
list.add(mCreateMarkupFlowManager.getFlowState());
list.add(mCreateIssueFlowStateManager.getIssueFlowState());
list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
list.add(mViewerStateManager.getIssueLoadingProgressChanges());
list.add(mMeasurementFlowStateManager.getFlowState());
list.add(mViewerStateManager.isSheetLoaded());
list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
list.add(mViewerStateManager.getMarkupViewMode());
list.add(mViewerStateManager.isFirstPerson());
list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
list.add(mCreateRfiFlowStateManager.getRfiFlowState());
attachSubscription(Observable.combineLatest(list, args -> {
Holder holder = new Holder();
holder.setFirst((String) args[0]);
holder.setSecond((Integer) args[1]);
holder.setThird((Boolean) args[2]);
holder.setFourth((Boolean) args[3]);
holder.setFifth((String) args[4]);
holder.setSixth((Boolean) args[5]);
holder.setSeventh((Boolean) args[6]);
holder.setEighth((Boolean) args[7]);
holder.setNinth((Boolean) args[8]);
holder.setTenth((Boolean) args[9]);
holder.setEleventh((String) args[10]);
return holder;
})
.filter(holder -> Util.isTrue(holder.sixth))
.compose(Util.applySchedulers())
.subscribe(holder -> {
if (isViewAttached()) {
String createMarkupState = holder.first;
Integer createIssueState = holder.second;
boolean markupsLoadingFinished = holder.third;
boolean issuesLoadingFinished = holder.fourth;
boolean loadingFinished = markupsLoadingFinished && issuesLoadingFinished;
String measurementState = holder.fifth;
boolean isMarkupLockMode = holder.eighth;
boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
boolean showCreateMeasureButton = shouldShowMeasureButton();
boolean showCreateFieldIssueButton = holder.seventh;
boolean isFirstPersonEnabled = holder.ninth;
Boolean showCreateRfiButton = holder.tenth;
String rfiFlowState = holder.eleventh;
}
})
);
public class Holder {
public String first;
public Integer second;
public Boolean third;
public Boolean fourth;
public String fifth;
public Boolean sixth;
public Boolean seventh;
public Boolean eighth;
public Boolean ninth;
public Boolean tenth;
public String eleventh;
public void setEleventh(String eleventh) {
this.eleventh = eleventh;
}
public void setFirst(String first) {
this.first = first;
}
public void setSecond(Integer second) {
this.second = second;
}
public void setThird(Boolean third) {
this.third = third;
}
public void setFourth(Boolean fourth) {
this.fourth = fourth;
}
public void setFifth(String fifth) {
this.fifth = fifth;
}
public void setSixth(Boolean sixth) {
this.sixth = sixth;
}
public void setSeventh(Boolean seventh) {
this.seventh = seventh;
}
public void setEighth(Boolean eighth) {
this.eighth = eighth;
}
public void setNinth(Boolean ninth) {
this.ninth = ninth;
}
public void setTenth(Boolean tenth) {
this.tenth = tenth;
}
public Holder() {}
}
【讨论】: