【问题标题】:Building a list from two observable sources in RxKotlin/RxJava using collectInto使用 collectInto 从 RxKotlin/RxJava 中的两个可观察源构建列表
【发布时间】:2019-07-16 02:32:23
【问题描述】:

我有一个Category 数据类和一个Plan 数据类。 每个 Category 都有一个计划 ID 列表。通过 Room 存储了类别和计划。我正在尝试构建一个本地List<Any>,我将每个类别添加到一个列表中,然后添加它的每个计划。

因此,对于每个类别,将类别添加到列表中,然后添加属于该类别的每个计划。

最终的结果应该是这样的......

0 -> a Category
1 -> a Plan
2 -> a Plan
3 -> a Plan
4 -> a Category
5 -> a Plan

等等

以下调用成功返回Observable<List<Category>>Observable<Plan>

AppDatabase
   .getDatabase(context)
   .categoryDao()
   .getAll()

AppDatabase.getDatabase(context).planDao().getPlan(planId)

在这里我正在尝试构建我的列表,但是当我订阅它时它实际上从未发出。没有完成,或者错误。流中的其他所有内容都会受到打击。为什么我不能得到最终结果?

    fun fetchCategoriesAndPlans() {
    val items = mutableListOf<Any>()
    AppDatabase
        .getDatabase(context)
        .categoryDao()
        .getAll()
        .concatMap { listOfCategories ->
            listOfCategories.toObservable()
        }
        .doOnNext { category ->
            items.add(category)
        }
        .concatMap { category ->
            category.getPlanIds()!!.toObservable()
        }
        .flatMap { planId ->
            AppDatabase.getDatabase(context).planDao().getPlan(planId)
        }.collectInto(items, BiConsumer{ list, i ->
            Log.d(TAG, "Collect into")
            list.add(i)
        })
        .subscribeBy(
            onSuccess = {
                Log.d(TAG, "Got the list")
            },
            onError = {
                Log.e(TAG, "Couldn't build list ${it.message}", it)
            })
}

【问题讨论】:

  • 代码不起作用的原因是collectInto()collectInto() 在源流完成之前不会发出任何内容。 .categoryDao().getAll() on Room 返回一个永远不会完成的无限流。给定的答案会起作用,因为答案使用Single,而不是Observable
  • val items = mutableListOf&lt;Any&gt;(); doOnNext { items.add(it) } 是行不通的。

标签: android rx-java2 android-room collect rx-kotlin


【解决方案1】:

我根据您的案例制作了一个演示,这有助于发出 CategoryPlan

override fun onCreate(savedInstanceState: Bundle?) {
    ...

    getCategories()
        .flattenAsObservable { it }
        .flatMap { getPlanWithCategory(it) }
        .toList()
        .subscribe({
            for (item in it) {
                Log.i("TAG", " " + item.javaClass.canonicalName)
            }
        }, {

        })
}

fun getPlanWithCategory(category: Category): Observable<Any> {
    val getPlansObservable = Observable.fromArray(category.planIds).flatMapIterable {
        it
    }.flatMap {
        getPlan(it).toObservable()
    }
    return Observable.concat(Observable.just(category), getPlansObservable)
}


fun getPlan(planId: String): Single<Plan> {
    return Single.just(Plan())
}

fun getCategories(): Single<List<Category>> {
    val categories = arrayListOf<Category>()
    categories.add(Category(arrayListOf("1", "2", "3")))
    categories.add(Category(arrayListOf("1", "2")))
    return Single.just(categories)
}

class Category(val planIds: List<String>)

class Plan

输出

 I/TAG:  Category
 I/TAG:  Plan
 I/TAG:  Plan
 I/TAG:  Category
 I/TAG:  Plan
 I/TAG:  Plan

希望对你有帮助

【讨论】:

  • 效果很好,知道为什么我的不工作吗?
  • 对不起。我不知道,我试着拿你的代码和测试来找到问题,但它总是有组合错误(我无法修复它),然后我根据我的知识做出新的方法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-05-13
  • 1970-01-01
  • 1970-01-01
  • 2022-12-15
相关资源
最近更新 更多