【问题标题】:Parallel request with Retrofit, Coroutines and Suspend functions具有改造、协程和挂起功能的并行请求
【发布时间】:2020-02-27 17:10:21
【问题描述】:

我正在使用 Retrofit 来发出一些网络请求。我还将协程与“挂起”功能结合使用。

我的问题是:有没有办法改进以下代码。这个想法是并行启动多个请求并等待它们全部完成,然后再继续该功能。

lifecycleScope.launch {
    try {
        itemIds.forEach { itemId ->
            withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
        }
    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

(注意“MyService.getItem()”是一个“挂起”函数。)

我想在这种情况下,有比 foreach 更好的东西。

有人有想法吗?

【问题讨论】:

标签: android kotlin coroutine kotlin-coroutines suspend


【解决方案1】:

我准备了三种方法来解决这个问题,从最简单到最正确的一种。为了简化方法的介绍,我提取了以下通用代码:

lifecycleScope.launch {
    val itemById = try {
        fetchItems(itemIds)
    } catch (exception: Exception) {
        exception.printStackTrace()
    }
    Log.i(TAG, "Fetched these items: $itemById")
}

在我继续之前,请注意:您的getItem() 函数是可暂停的,您无需将其提交给IO 调度程序。你所有的协程都可以在主线程上运行。

现在让我们看看如何实现fetchItems(itemIds)

1。简单的 forEach

这里我们利用了所有协程代码都可以在主线程上运行的优势:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
    val itemById = mutableMapOf<Long, Item>()
    coroutineScope {
        itemIds.forEach { itemId ->
            launch { itemById[itemId] = MyService.getItem(itemId) }
        }
    }
    return itemById
}

coroutineScope 会在里面等待你launch 的所有协程。即使它们彼此同时运行,启动的协程仍会分派到单个(主)线程,因此从它们中更新映射不存在并发问题。

2。线程安全的变体

它利用单线程上下文的属性这一事实可以被视为第一种方法的限制:它不能推广到基于线程池的上下文。我们可以通过async-await机制来避免这个限制:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
    itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
            .map { it.await() }
            .toMap()
}

这里我们依赖Collection.map()的两个不明显的属性:

  1. 它急切地执行所有转换,因此在进入第二阶段之前完全完成了对Deferred&lt;Pair&lt;Long, Item&gt;&gt; 集合的第一次转换,我们在这里等待所有这些。
  2. 它是一个内联函数,即使函数本身不是 suspend fun 并获得一个不可暂停的 lambda (Deferred&lt;T&gt;) -&gt; T,它也允许我们在其中编写可挂起的代码。

这意味着所有的获取都是同时完成的,但是地图被组装在一个协程中。

3。改进并发控制的基于流的方法

上面为我们解决了并发问题,但它没有任何背压。如果您的输入列表非常大,您需要限制同时发出的网络请求数。

您可以使用基于Flow 的成语来做到这一点:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
        .asFlow()
        .flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
            flow { emit(itemId to MyService.getItem(itemId)) }
        }
        .toMap()

这里的魔力在于.flatMapMerge 操作。你给它一个函数(T) -&gt; Flow&lt;R&gt;,它会在所有输入上按顺序执行它,然后它会同时收集它得到的所有流。请注意,我不能将 flow { emit(getItem()) } } 简化为 flowOf(getItem()),因为在收集流量时必须延迟调用 getItem()

Flow.toMap()目前没有在标准库中提供,所以这里是:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
    val result = mutableMapOf<K, V>()
    collect { (k, v) -> result[k] = v }
    return result
}

【讨论】:

  • 很棒的反应,我怎么能用你的第三种方法来上传多个文件?提前致谢!
【解决方案2】:

如果您正在寻找一种更好的方式来编写它并消除foreach

lifecycleScope.launch {
    try {

        itemIds.asFlow()
               .flowOn(Dispatchers.IO) 
               .collect{ itemId -> itemById[itemId] = MyService.getItem(itemId)}

    } catch (exception: Exception) {
        exception.printStackTrace()
    }

    Log.i(TAG, "All requests have been executed")
}

另外请查看lifecycleScope 我怀疑它正在使用Dispatchers.Main。如果是这种情况,您可以删除此 .flowOn(Dispatchers.IO) 额外调度程序声明。

欲了解更多信息:Kotlin Asynchronous Flow

【讨论】:

  • 不幸的是,这并没有实现并发。此外,在任何情况下您都不需要Dispatchers.IOIO 调度程序是在大型线程池中调用遗留阻塞函数的拐杖。
  • 我一直在使用调度程序,因为使用的是原始代码,这就是为什么我添加了关于额外调度程序的评论:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-05-08
  • 2020-01-21
  • 1970-01-01
  • 2021-08-16
  • 1970-01-01
  • 2021-05-21
  • 1970-01-01
相关资源
最近更新 更多