【问题标题】:How to combine following Observables into one in Kotlin?如何在 Kotlin 中将以下 Observable 合并为一个?
【发布时间】:2018-09-27 05:39:22
【问题描述】:

我在 Kotlin 中有这两个 Observables,其中只是充当计时器,另一个是 HTTP 网络调用响应 Observer

timerDisposable = Observable.timer(daleyABCControlResetSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
        .doOnNext {
            if (getABCUpdate() != null) {
                Log.d("ABC", "Media status reset after 3 seconds: ")
                updateABCResponse(getABCUpdate())
            }
        }.subscribe()

disposable = audioApi.setABCUpdate(abcUpdate)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            timerDisposable.dispose()
            updateABCResponse(it)
            Log.d("ABC", "Media Status updated:")
        }, {
            Log.d("ABC", "Error updating Media Status: " + it.message)
            isABCControlChangeRequested = false
        })

我对这种方法不满意,谁能指导我正确的方向来充分利用rx 的潜力。提前致谢。

编辑

  Observable.combineLatest(Observable.timer(daleyABCControlResetSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
            .doOnNext {
                if (getABCUpdate() != null) {
                    Log.d("ABC", "Media status reset after 3 seconds: ")
                    updateABCResponse(getABCUpdate())
                }
            },

            audioApi.setABCUpdate(abcUpdate)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()),
            BiFunction<Long, ABCStatusUpdate, ABCStatusUpdate> { _, abcStatusUpdate ->
                abcStatusUpdate
            })
            .subscribe({
                timerDisposable.dispose()
                updateABCResponse(abcStatusUpdate)
                Log.d("ABC", "Media Status updated:")
            }, {
                Log.d("ABC", "Error updating Media Status: " + abcStatusUpdate.vol)
                isABCControlChangeRequested = false
            })

【问题讨论】:

  • 你能告诉updateABCResponse(getABCUpdate())方法在做什么
  • 它只是使用PublishSubjectpublish/listen 将数据发送到另一个类
  • 你能更清楚地告诉你你想做什么吗?你想在特定的delay 之后执行http 请求吗?例如,等待几秒钟 -> 执行请求 -> updateABCResponse?在那种情况下audioApi.setABCUpdate() 执行什么操作?

标签: android kotlin rx-java2 zipwith


【解决方案1】:

您可以使用 combinelatest、zip 或 merge 进行组合。我认为在你的情况下 combinelatest 是合适的

Observable.combineLatest(
            Observable.timer(daleyABCControlResetSeconds, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
                .doOnNext {
                    if (getABCUpdate() != null) {
                        Log.d("ABC", "Media status reset after 3 seconds: ")
                        updateABCResponse(getABCUpdate())
                    }
                },
            audioApi.setABCUpdate(abcUpdate)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread()),
            BiFunction<Long, YourApiResponseType, YourApiResponseType> { _, response ->
                response})
        .subscribe({
            timerDisposable.dispose()
            updateABCResponse(it)
            Log.d("ABC", "Media Status updated:")
        }, {
            Log.d("ABC", "Error updating Media Status: " + it.message)
            isABCControlChangeRequested = false
        })

统一更新: 您可以像这样更改代码:

Observable.timer(5, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).startWith(-1L)
        .doOnNext {
            if (it == -1L) return@doOnNext
            //your condition
        }

【讨论】:

  • combineLatest 给了我错误。 "以下函数均不能使用提供的参数调用"
  • @Ahmed S. Durrani,您能否将您的代码与 combineLatest 添加到您的帖子中?
  • 奇怪,它必须工作正常。检查 - 你使用 io.reactivex.functions.BiFunction 还是 autoimport set java.util.function.BiFunction
  • 但现在这个 if (getABCUpdate() != null) { Log.d("ABC", "Media status reset after 3 seconds: ") updateABCResponse(getABCUpdate()) } 根本不执行
  • 我不明白。如果您从服务器获得响应,您不想执行此条件,对吗?
猜你喜欢
  • 2017-12-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-04
  • 1970-01-01
  • 2011-10-29
  • 1970-01-01
  • 2023-01-27
相关资源
最近更新 更多