【问题标题】:How to emit final value when combining two Observable streams?组合两个 Observable 流时如何发出最终值?
【发布时间】:2018-04-08 23:01:14
【问题描述】:

使用 Jake Wharton 的 Managing State with RxJava 模式。

我将两个 api 调用组合在一起以并行执行。

当两者都完成时,我如何发出“成功”项目?

请参见下面代码中的 cmets。

谢谢!

主调用函数:

    Observable
            .just(UpdatePicEvent(userId, file))
            .compose(updatePic()) <-- Handles updating pic, emits models consumed by UI
            .mergeWith(
                    Observable
                        .just(UpdateProfileEvent(..params...))
                        .compose(updateProfile()) <-- Handles updating other settings, emits models consumed by UI
            )

            // TODO Need to add something to emit a Success() model item when both actions above have completed  

            .subscribe(...pass models to UI...)

updatePic()

fun updatePic(): ObservableTransformer<UpdatePicEvent, ProfileSettingsModel> {
        return ObservableTransformer {
            it.flatMap {
                api.uploadProfilePic(it.userId, it.pic)
                    .map { UpdatePicSuccessful(it) as ProfileSettingsModel }
                    .onErrorReturn { UpdatePicError(it) as ProfileSettingsModel }
                    .startWith(UpdatePicInProgress() as ProfileSettingsModel)
            }
        }
    }

updateProfile()

fun updateProfile(): ObservableTransformer<UpdateProfileEvent, ProfileSettingsModel> {
        return ObservableTransformer {
            it.flatMap {
                api
                    .updateUser(...params...)
                    .subscribeOn(Schedulers.io())
                    .map { UpdateProfileSuccessful(it) as ProfileSettingsModel }
                    .onErrorReturn { UpdateProfileError(it) as ProfileSettingsModel }
                    .observeOn(AndroidSchedulers.mainThread())
                    .startWith(UpdateProfileInProgress() as ProfileSettingsModel)
            }
        }
    }

【问题讨论】:

  • 你最终的成功是否需要从结果中得到什么?你想通过你发出的最后一件事来完成什么?另外,您是否需要按顺序调用updatePicupdateProfile(您似乎不需要,因为您合并了……但请确保给您一个好的答案)
  • @marianosimone “你最终的成功是否需要从结果中获得任何东西?”在这种情况下,没有。 “你想通过你发出的最后一件事来完成什么?”我正在使用密封类ProfileSettingsModel 来传达事务的状态。我要发出的最后一件事是表示两个 API 调用都已成功完成。谢谢!
  • 以上答案能解决你的问题吗?

标签: android kotlin rx-java


【解决方案1】:

您收到两个不同的ProfileSettingsModel,您需要以某种方式合并。如果您不想在订阅者中执行此操作,您也可以使用 zip operator 来实现您的链。

通过指定函数将多个 Observable 的发射组合在一起,并根据该函数的结果为每个组合发射单个项目

Observable.zip(
    Observable.just(UpdatePicEvent(userId, file)).compose(updatePic())
    Observable.just(UpdateProfileEvent(..params...)).compose(updateProfile())
    mergePicAndProfile)

其中mergePicAndProfileBiFunction 接收两个结果并发出单个实体。

【讨论】:

    【解决方案2】:

    鉴于您对原始问题的评论,concat 可以帮助您。文档的相关部分说:

    Concat 操作符将多个 Observable 的输出连接起来,使它们像单个 Observable 一样运行,第一个 Observable 发出的所有项目都在第二个 Observable 发出的任何项目之前发出(依此类推,如果有超过两个)。

    Concat 等待订阅您传递给它的每个其他 Observable,直到前一个 Observable 完成。

    实施:

    Observable.mergeDelayError(
        Observable.just(UpdatePicEvent(userId, file)).compose(updatePic()),
        Observable.just(UpdateProfileEvent(userId, params)).compose(updateProfile())    
    )
        .concatWith(Observable.just(AllSuccessful()))  // Or whatever
        .onErrorReturn { when (it) {
            is UpdatePicException -> UpdatePicError(it)
            is UpdateProfileException -> UpdateProfileError(it)
        }}
    

    要做到这一点,有两个关键点:

    1. 您需要替换updatePicupdateProfile 转换器中的onErrorReturn,这样它们在失败时实际上会发出错误(可能是onErrorResumeNext { Observable.error(MyTypeOfException()) } 之类的东西。如果你不这样做,就没有办法让流知道是否存在错误(因为将错误包装在项目中使其与成功无法区分)
    2. 使用mergeDelayError 让每个流独立于另一个继续。如果您只使用merge,第一个失败的将阻止另一个继续。

    (请注意,我稍微修改了您的原始代码以使其更具可读性,恕我直言)

    【讨论】:

    • 这在 2 次调用成功时有效(感谢改进的语法)。但是,当出现错误时,它仍然发出 `AllSuccessful'。
    • 哦,没有意识到您将错误包装在转换器内部的实际结果中。查看我的编辑,看看是否有帮助
    • 感谢您的帮助。但是,变压器在其他地方用作单独的调用,因此更改它们有点违背了目的。我一直在寻找一种无需编写新链即可组合它们的方法。似乎我应该只携带成功发射的状态并在subscribe 中检查它们?
    • 我想是的。如果您将错误封装在排放中,那么您将无法区分整体成功与失败,除非您以某种方式在订阅中检查它们。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-11-25
    • 2023-03-26
    • 1970-01-01
    • 2018-05-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多