我准备了三种方法来解决这个问题,从最简单到最正确的一种。为了简化方法的介绍,我提取了以下通用代码:
lifecycleScope.launch {
val itemById = try {
fetchItems(itemIds)
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "Fetched these items: $itemById")
}
在我继续之前,请注意:您的getItem() 函数是可暂停的,您无需将其提交给IO 调度程序。你所有的协程都可以在主线程上运行。
现在让我们看看如何实现fetchItems(itemIds)。
1。简单的 forEach
这里我们利用了所有协程代码都可以在主线程上运行的优势:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
val itemById = mutableMapOf<Long, Item>()
coroutineScope {
itemIds.forEach { itemId ->
launch { itemById[itemId] = MyService.getItem(itemId) }
}
}
return itemById
}
coroutineScope 会在里面等待你launch 的所有协程。即使它们彼此同时运行,启动的协程仍会分派到单个(主)线程,因此从它们中更新映射不存在并发问题。
2。线程安全的变体
它利用单线程上下文的属性这一事实可以被视为第一种方法的限制:它不能推广到基于线程池的上下文。我们可以通过async-await机制来避免这个限制:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
.map { it.await() }
.toMap()
}
这里我们依赖Collection.map()的两个不明显的属性:
- 它急切地执行所有转换,因此在进入第二阶段之前完全完成了对
Deferred<Pair<Long, Item>> 集合的第一次转换,我们在这里等待所有这些。
- 它是一个内联函数,即使函数本身不是
suspend fun 并获得一个不可暂停的 lambda (Deferred<T>) -> T,它也允许我们在其中编写可挂起的代码。
这意味着所有的获取都是同时完成的,但是地图被组装在一个协程中。
3。改进并发控制的基于流的方法
上面为我们解决了并发问题,但它没有任何背压。如果您的输入列表非常大,您需要限制同时发出的网络请求数。
您可以使用基于Flow 的成语来做到这一点:
suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
.asFlow()
.flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
flow { emit(itemId to MyService.getItem(itemId)) }
}
.toMap()
这里的魔力在于.flatMapMerge 操作。你给它一个函数(T) -> Flow<R>,它会在所有输入上按顺序执行它,然后它会同时收集它得到的所有流。请注意,我不能将 flow { emit(getItem()) } } 简化为 flowOf(getItem()),因为在收集流量时必须延迟调用 getItem()。
Flow.toMap()目前没有在标准库中提供,所以这里是:
suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
val result = mutableMapOf<K, V>()
collect { (k, v) -> result[k] = v }
return result
}