【问题标题】:Equivalent of RxJava .toList() in Kotlin coroutines flowKotlin 协程流中 RxJava .toList() 的等价物
【发布时间】:2020-03-09 14:59:08
【问题描述】:

我有一种情况,我需要观察用户 ID,然后使用这些用户 ID 来观察用户。 userIds 或 users 可以随时更改,我希望让发出的用户保持最新。 这是我拥有的数据来源的示例:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

在这种情况下,我希望排放量为:

[User(abc_name), User(def_name)],然后

[User(123_name), User(234_name)],然后

[User(123_name_updated), User(234_name_updated)]

我想我可以像这样在 RxJava 中实现这一点:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}

我会编写什么函数来生成一个发出它的流?

【问题讨论】:

  • 您想要列出的 3 个排放量的流,还是想要标题中的 toList?它们是完全不同的问题。
  • 我只想要在任何给定时间最新的最新用户,所以列出的排放量很好。
  • 您是否在排放 2 和 3 之间遗漏了 [User(abc_name_updated), User(def_name_updated)],或者不应该排放它们(如果是,为什么)?
  • 这也可能被发射,我不介意,但想象延迟反映了数据库的发射,我并不真正关心那个发射,因为 id 123 和 234 应该已经发射并且 abc和 def 已经过时了

标签: kotlin rx-java kotlin-coroutines kotlin-flow


【解决方案1】:

我相信您正在寻找combine,它为您提供了一个可以轻松调用toList() 的数组:

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.collect {
        println(it)
    } 
}

这是内部参数名称更明确的部分,因为您在 Stack Overflow 上看不到 IDE 的类型提示:

combine(
    ids.map { id -> observeUserForId(id) }
) { arrayOfUsers: Array<User> ->
    arrayOfUsers.toList()
}.collect { listOfUsers: List<User> ->
    println(listOfUsers)
}

输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

Live demo(请注意,在演示中,所有输出都会同时出现,但这是演示站点的限制 - 代码在本地运行时,这些行的出现时间是您所期望的)

这避免了原始问题中讨论的 (abc_name_updated, def_name_updated)。但是,123_name_updated234_name 仍然存在中间发射,因为首先发射 123_name_updated,它会立即发送组合版本,因为它们是每个流中的最新版本。

但是,这可以通过对排放进行去抖动来避免(在我的机器上,小至 1 毫秒的超时有效,但为了保守起见,我做了 20 毫秒):

observeBestUserIds().collectLatest { ids ->
    combine(
        ids.map { id -> observeUserForId(id) }
    ) {
        it.toList()
    }.debounce(timeoutMillis = 20).collect {
        println(it)
    }
}

它会得到你想要的确切输出:

[User(name=abc_name), User(name=def_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

Live demo

【讨论】:

  • 非常感谢。 Flow 似乎缺少像 combine 之类的运算符,它等待所有流至少发出一次,而不是使用 debounce 运算符,这有点 hacky IMO。 Zip 可用,但仅适用于 2 个流程,而不是流程列表。
  • 它实际上会等待所有流至少发出一次 - 这可以通过将if (userId == "def") delay(250) 添加到observeUserForId 来证明 - 但它不会等待正如您所注意到的,每次后续时间都像 zip 运算符一样匹配排放量。基于这个问题,我认为当前的行为可能是正确的:由于您正在观察用户的更改,因此您不希望在显示任何更新之前等待您正在观看的每个用户至少更新一次他们的信息,您希望在更新发生时立即显示更新。但 zip 在某些情况下也可能是正确的。
  • 啊,你是对的。没有去抖动是正确的。出色的工作,谢谢!
【解决方案2】:

不幸的是,在 kotlin Flow 的当前状态下,这不是微不足道的,似乎缺少重要的运算符。但请注意,您不是在寻找 rxJavas toList()。如果您尝试在 rxjava 中使用 toListconcatMap 来执行此操作,则必须等到所有 observabes 完成。 这不是你想要的。

不幸的是,我认为没有办法绕过自定义函数。

它必须汇总 observeUserForId 返回的所有结果,以获取您将传递给它的所有 id。它也不是一个简单的窗口函数,因为实际上可以想象一个observeUserForId 已经返回了两次,而另一个调用仍然没有完成。因此,检查您是否已经拥有与将 id 传递给聚合函数相同数量的用户是不够的,您还必须按用户 id 进行分组。

我会在今天晚些时候尝试添加代码。

编辑:正如这里所承诺的那样,我的解决方案是我冒昧地稍微增加了要求。因此,每次所有 userId 都具有值并且基础用户更改时,流程都会发出。我认为这更有可能是您想要的,因为用户可能不会同步更改属性。

如果这不是您想要的,请发表评论。

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

inline fun <reified K, V> buildMap(keys: Set<K>, crossinline valueFunc: (K) -> Flow<V>): Flow<Map<K, V>> = flow {
    val keysSize = keys.size
    val valuesMap = HashMap<K, V>(keys.size)
    flowOf(*keys.toTypedArray())
            .flatMapMerge { key -> valueFunc(key).map {v -> Pair(key, v)} }
            .collect { (key, value) ->
                valuesMap[key] = value
                if (valuesMap.keys.size == keysSize) {
                    emit(valuesMap.toMap())
                }
            }
}

fun observeUsersForIds(): Flow<List<User>> {
    return observeBestUserIds().flatMapLatest { ids -> buildMap(ids.toSet(), ::observeUserForId as (String) -> Flow<User>) }
            .map { m -> m.values.toList() }
}


fun main() = runBlocking {
    observeUsersForIds()
        .collect { user ->
            println(user)
        }
}

这将返回

[User(name=def_name), User(name=abc_name)]
[User(name=123_name), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name)]
[User(name=123_name_updated), User(name=234_name_updated)]

可以在线运行代码here

【讨论】:

    【解决方案3】:

    您可以使用flatMapConcat

    val users = observeBestUserIds()
            .flatMapConcat { ids ->
                flowOf(*ids.toTypedArray())
                    .map { id ->
                        observeUserForId(id)
                    }
            }
            .flattenConcat()
            .toList()
    

        observeBestUserIds()
            .flatMapConcat { ids ->
                flowOf(*ids.toTypedArray())
                    .map { id ->
                        observeUserForId(id)
                    }
            }
            .flattenConcat()
            .collect { user ->
    
            }
    

    【讨论】:

    • 这会导致类型不匹配。 id 是一个字符串列表。
    • 更新了回复
    • 感谢您的帮助。我想我们越来越近了。然而,这会返回一个Flow&lt;Flow&lt;User&gt;&gt;。我想要的是Flow&lt;List&lt;User&gt;&gt;,其中排放量如上。
    • 又更新了,拖了这么久。
    • 提供Flow&lt;User&gt;。我想要发布用户列表。看起来很简单,但我就是不明白。
    猜你喜欢
    • 2021-04-18
    • 2023-03-14
    • 1970-01-01
    • 2019-11-28
    • 2016-07-05
    • 2019-01-18
    • 1970-01-01
    • 2015-01-11
    • 1970-01-01
    相关资源
    最近更新 更多