【问题标题】:How to perform operation based on result of multiple RxJava Completable results如何根据多个 RxJava Completable 结果的结果执行操作
【发布时间】:2020-12-09 11:04:18
【问题描述】:

我已经为此苦恼了一段时间,我在管理一个必须在 Kotlin 中使用 Rx 的需求时迷失了方向。

让我解释一下。

有一组 ids 其等效项需要从服务器中删除,并根据服务器成功最终在本地。

基本上

  1. 进行网络调用以删除单个id(支持的网络调用返回Completable
  2. 如果收到complete(成功)回调,则将id 存储在list(内存)中
  3. 对所有 id 执行第一步和第二步删除
  4. 每次网络调用完成后,传递列表以从本地数据库中删除

所以这些功能是可用的,不能修改。

  1. fun deleteId(id: String): Completable { networkCall.deleteId(id) }
  2. fun deleteIds(ids: List<String>): Unit { localDb.deleteId(ids) }

这是我尝试过的,但显然不完整并且卡住了......

val deleted = CopyOnWriteArrayList<String>()
val error = CopyOnWriteArrayList<String>()
items?.filter { it.isChecked }
    ?.map { Pair(it.id, dataManager.deleteId(it.id)) }
    ?.forEach { (Id, deleteOp) ->
        deleteOp.subscribeOn(Schedulers.io())
                .subscribe(object: CompletableObserver {
                    override fun onComplete() { deleted.add(Id) }

                    override fun onSubscribe(d: Disposable) { disposableManager += d }

                    override fun onError(e: Throwable) { error.add(Id) }

                })
    }

所以现在这里有多个问题,其中一个是我无法找到一个地方知道所有请求都已完成以启动localDb删除的要求。

有没有一种方法可以让我使用Flowable.fromIterable()zipmerge 以某种方式遵循上述命令链来实现上述场景?

【问题讨论】:

    标签: java kotlin rx-java rx-kotlin


    【解决方案1】:

    如果我正确理解了您的用例,那么应该这样做:

    // ids of items to delete, for illustration lets have some temp set
    val ids = setOf<String>("1", "2", "3", "4")
    val deleteIdSingles = mutableListOf<Single<String>>()
    ids.forEach { id ->
        deleteIdSingles.add(
            api.deleteId(id)
                // when request successfully completes, return its id wrapped in a Single, instead of Completable
                .toSingle<String> { id }
                // return a flag when this request fails, so that the stream is not closed and other requests would still be executed
                .onErrorReturn { "FAILED" }
        )
    }
    
    Single.merge(deleteIdSingles)
        // collect the results of the singles (i.e. the ids of successful deletes), and emit a set of those ids once all the singles has completed
        .collect(
            { mutableListOf() },
            { deletedIds: MutableList<String>, id: String -> if (id != "FAILED") deletedIds.add(id) }
        )
        .observeOn(Schedulers.io())
        .subscribe(
            { deletedIds ->
                    db.deleteIds(deletedIds)
            }, { error ->
                // todo: onError
            })
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2010-12-02
      • 1970-01-01
      • 2016-06-14
      • 1970-01-01
      • 2018-09-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多