【问题标题】:Limiting the maximum number of coroutines that can run in a scope限制可以在范围内运行的最大协程数
【发布时间】:2019-10-17 08:52:06
【问题描述】:

我正在将我们当前的应用程序从 Java 转换为 Kotlin,我遇到了这个问题。

用于使用线程从服务器传输数据的 java 实现。

它将创建大约 100 个不同的线程来请求数据,但据我所知,一次运行的线程不超过 4 个,其他线程会等待线程完成后再开始。

在将它翻译成 Kotlin 时,我使用了协程

这会产生一个问题,因为服务器显然无法处理实际发送的 100 个请求。

所有协程都在同一个作用域内启动,所以是这样的:

//this is a custom scope that launches on Dispatchers.IO + a job that I can use to cancel everything
transferScope.launch {
     //loadData is a suspending function that returns true/false 
     val futures = mDownloadJobs.map{ async { it.loadData() } }
     val responses = futures.awaitAll()
     //check that everything in responses is true etc....
}

有没有办法让特定的 transferScope 一次只允许最多 5 个协程,然后当一个完成时让另一个协程运行? (我不关心顺序)

如果不能通过范围来完成,有没有其他方法可以实现?

【问题讨论】:

    标签: android kotlin coroutine


    【解决方案1】:

    要求每个协程在发出请求之前从总共 5 个许可中获取 Kotlin Semaphore 许可。

    类似这样的:

        import kotlinx.coroutines.sync.Semaphore
    
        val requestSemaphore = Semaphore(5)
    
        val futures = mDownloadJobs.map {
            async {
                // Will limit number of concurrent requests to 5
                requestSemaphore.withPermit {
                    it.loadData()
                }
            }
        }
    
        val responses = futures.awaitAll()
    

    【讨论】:

    • 这个好像可以,我试试谢谢
    • 这不会为 mDownloadJobs 中的每个项目创建一个协程吗?信号量一次只允许 5 个协程。
    • 与 StackOverflow 中的其他解决方案相比,包括对其他相同问题的回答,这是一个出色的解决方案。这应该有更多的赞成票。
    【解决方案2】:

    我认为你应该引导和限制你正在创建的协程的创建。

    val channel = Channel<Job>()
    
    transferScope.launch {
      mDownloadJobs.forEach { channel.send(it) }
      channel.close() // don't forget to close the channel
    }
    
    coroutineScope {
        val responses = mutableListOf<Any>()
        repeat(5).map { 
          launch {
            for (job in mDownloadJobsChannel) {
              responses.add(jobs.loadData())
            }
          }
        }
    }
    

    本例中的并行化是 5 个协程。

    我没有测试这段代码:D,我确信有更简洁的方法可以做到这一点。

    【讨论】:

    • 是的,这就是我最终所做的,而且效果非常好(尽管我尝试了信号量并且它们也有效),当我有时间时,我会发布我使用的代码以便人们如果他们提出这个答案,将会有一个实际的例子
    • 您正在写入多个协程和可能多个线程中的可变列表。除非外部作用域是单线程的,否则这不是线程安全的。
    【解决方案3】:

    你可以做这样的事情,将请求分成 4 个块,启动协程来处理它们,然后等到该组完成后再启动一个新的。

    requests.chunked(4).forEachIndexed { index, chunk ->
        coroutineScope {
            LOG("processing chunk $index")
            chunk.forEach {
                launch {
                    delay(100)
                }
            }
            LOG("done processing $index")
        }
    }
    

    【讨论】:

    • 是的,我已经想到了,这可能就是我最终会做的事情,但我希望避免它
    【解决方案4】:

    Dispatchers.IO 声称要创建一个线程池并将跨度限制在该池中。它的文档字符串告诉您如何更改池大小(系统属性)。

    【讨论】:

      猜你喜欢
      • 2016-08-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-22
      • 2023-03-26
      • 2019-05-02
      • 1970-01-01
      相关资源
      最近更新 更多