【问题标题】:Kotlin Coroutines - How to block to await/join all jobs?Kotlin Coroutines - 如何阻止等待/加入所有工作?
【发布时间】:2019-07-10 05:15:14
【问题描述】:

我是 Kotlin/Coroutines 的新手,所以希望我只是遗漏了一些东西/不完全理解如何为我要解决的问题构建我的代码。

本质上,我正在获取一个字符串列表,对于列表中的每个项目,我想将它发送到另一个方法来完成工作(进行网络调用并根据响应返回数据)。 (编辑:)我希望所有调用同时启动,并阻塞直到所有调用完成/响应被执行,然后返回一个包含每个响应信息的新列表。

我可能还不完全了解何时使用启动/异步,但我尝试同时使用启动(joinAll)和异步(await)。

fun processData(lstInputs: List<String>): List<response> {

    val lstOfReturnData = mutableListOf<response>()

    runBlocking {
        withContext(Dispatchers.IO) {
            val jobs = List(lstInputs.size) {
                launch {
                    lstOfReturnData.add(networkCallToGetData(lstInputs[it]))
                }
            }
            jobs.joinAll()
        }
    }

    return lstofReturnData

我期望发生的是,如果我的 lstInputs 的大小为 120,当所有作业都加入时,我的 lstOfReturnData 的大小也应该为 120。

实际发生的是不一致的结果。我会运行一次,我在最终列表中得到 118,再次运行它,它是 120,再次运行它,它是 117,等等。在 networkCallToGetData() 方法中,我正在处理任何异常,至少返回一些东西对于每个请求,无论网络调用是否失败。

任何人都可以帮助解释为什么我得到不一致的结果,以及我需要做些什么来确保我适当地阻止并且在继续之前加入所有作业?

【问题讨论】:

    标签: kotlin kotlin-coroutines jobs


    【解决方案1】:

    mutableListOf() 创建一个ArrayList,它不是线程安全的。
    尝试改用ConcurrentLinkedQueue

    另外,你使用的是 Kotlin/Kotlinx.coroutine 的稳定版本(不是旧的实验版本)吗?在稳定版中,引入了结构化并发,就不用写jobs.joinAll anymore了。 launchrunBlocking 的扩展函数,它将在runBlocking 范围内启动新的协程,runBlocking 范围将自动等待所有启动的作业完成。所以上面的代码可以缩短为

    val lstOfReturnData = ConcurrentLinkedQueue<response>()
    runBlocking {
            lstInputs.forEach {
                launch(Dispatches.IO) {
                    lstOfReturnData.add(networkCallToGetData(it))
                }
            }
    }
    return lstOfReturnData
    

    【讨论】:

    • 非常感谢您对使用 ConcurrentLinkedQueue 的回复和指点,以及关于稳定 API 的更新。是的,我使用的是新版本,但是我团队的很多代码都在旧版本上,所以我发现我有点混淆了。我刚刚做了一些初步测试,我引入了delay(2000),它看起来可以按我的意愿工作。所有项目都会启动(不要等待每个项目都通过延迟),完成后全部加入,之后的列表大小就是我所期待的。这个周末我会做一些更深入的测试,如果它看起来还不错,就接受你的回答。
    • 非常感谢您提供此解决方案!请改正Dispatches 中的错字('r' 被遗漏)。
    【解决方案2】:

    runBlocking 中断当前线程,直到其完成。我想这不是你想要的。如果我想错了并且你想阻塞当前线程,那么你可以摆脱协程并在当前线程中进行网络调用:

    val lstOfReturnData = mutableListOf<response>()
    lstInputs.forEach {
        lstOfReturnData.add(networkCallToGetData(it))
    } 
    

    但如果这不是您的意图,您可以执行以下操作:

    class Presenter(private val uiContext: CoroutineContext = Dispatchers.Main) 
        : CoroutineScope {
    
        // creating local scope for coroutines
        private var job: Job = Job()
        override val coroutineContext: CoroutineContext
            get() = uiContext + job
    
        // call this to cancel job when you don't need it anymore
        fun detach() {
            job.cancel()
        }
    
        fun processData(lstInputs: List<String>) {
    
            launch {
                val deferredList = lstInputs.map { 
                    async(Dispatchers.IO) { networkCallToGetData(it) } // runs in parallel in background thread
                }
                val lstOfReturnData = deferredList.awaitAll() // waiting while all requests are finished without blocking the current thread
    
                // use lstOfReturnData in Main Thread, e.g. update UI
            }
        }
    }
    

    【讨论】:

    • 这种方法似乎更有效,如果我错了,您可以纠正我,但由于所有作业将同时运行,因此与接受的答案相比,执行时间会更少。
    【解决方案3】:

    Runblocking 应该意味着您不必调用 join。 从 runblocking 范围内启动协程应该为您执行此操作。 您是否尝试过:

    fun processData(lstInputs: List<String>): List<response> {
    
    val lstOfReturnData = mutableListOf<response>()
    
    runBlocking {
        lstInputs.forEach {
                launch(Dispatchers.IO) {
                    lstOfReturnData.add(networkCallToGetData(it))
                }
       } 
    }
    
    return lstofReturnData
    

    【讨论】:

      猜你喜欢
      • 2020-09-02
      • 2021-10-15
      • 1970-01-01
      • 1970-01-01
      • 2019-07-18
      • 1970-01-01
      • 1970-01-01
      • 2021-10-27
      • 2012-01-24
      相关资源
      最近更新 更多