【问题标题】:How to achieve non blocking with Coroutines如何使用协程实现非阻塞
【发布时间】:2019-05-31 10:08:04
【问题描述】:

我正在尝试以一种比线程获得更多网络调用的方式实现非 IO 阻塞协程 (kotlin)。很清楚如何以不阻塞主线程的方式使用协程,但是最终每个协程都在一个线程内运行,并且网络调用阻塞该线程(为了比较,使用 NodeJS 允许重用工作线程,而其他请求是等待回复回来)。

我的用例是,对于每个传入的请求,我需要进行 2-3 次独立的外部调用,聚合响应并返回。按顺序执行此操作是浪费时间。并行执行此操作将需要我的服务运行大约 2-3 倍的传入线程数(使其大约 1K 的线程仅用于等待 IO)。

许多协同程序示例都使用delay,从而允许同时为多个协同程序重用线程。但是,通过实际使用网络调用的真实用例,我无法做到这一点。

我错过了什么?如何在外部服务响应之前暂停协程?

例如,此示例仅在 5 个线程上运行,但在 1000 次调用中重用线程,因此所有调用都在 ~100 毫秒内结束(每个处理时间为 100 毫秒)

val myPool = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
runBlocking {
  (1..1000).forEach {
    launch(myPool) {
      delay(100)
    }
  }
}

与此相反,实际上只在 5 个线程上运行 5 个并发调用,并且只在完成一次后继续下一个。 我希望所有调用“并行”执行,利用在等待响应时发送请求 - 就像在 NodeJS 中所做的那样

val restTemplate = RestTemplate()
val myPool = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
runBlocking {
  (1..1000).forEach {
    launch(myPool) {
      restTemplate.getForObject("http://myTest.com", String::class.java) // Say it takes 100ms to response
    }
  }
}

【问题讨论】:

  • 您是否尝试在restTemplate.getForObject(...) 函数中添加修饰符suspend
  • 如下所述 - 是的,我试过了,但没有 - 没有帮助。您使用的下面的示例是 delay,它适用于所有示例,但真正的 IO 等待代码的执行情况不同。
  • 看来你想要达到的目标在 Kotlin 中是不可能的。当您在工作线程中发出请求时,它会在等待响应时处于忙碌状态。例如,如果您有一个包含 5 个线程、6 个请求的线程池,并且您已经并行运行了 5 个请求,则第六个请求将等到之前的一些请求完成以释放其中一个线程。能否请您指向我可以阅读有关“NodeJS 允许在其他请求等待响应返回时重用工作线程”的文档。
  • Node.js 根据定义是一个单线程进程。所有代码都在单个线程中执行,事件循环管理任务的并发性。有许多应用程序运行 node.jss 一个同时服务多个请求的 Web 服务器。您可以在nodejs.org/en/docs/guides/event-loop-timers-and-nexttick 找到更多相关信息
  • 此外,我发现通过 Kotlin 无法实现这一点非常令人失望。如果是这种情况,那么所有delay 示例的用途是什么?所有示例(包括您演示的示例)显示了如何执行多于线程的任务,但是延迟不是现实世界的用例,一旦使用现实世界的用例,这将不起作用。

标签: kotlin coroutine kotlin-coroutines


【解决方案1】:

Kotlin 协程并不是将您的阻塞网络操作变成非阻塞的魔法。它们只允许您使用异步网络实现,而无需繁琐的期货和回调。

所以,为了继续,

  • 第 1 步。查找异步 REST 库
  • 第 2 步。编写一些 Kotlin 代码,将库的基于回调或基于未来的 API 与 Kotlin 协程桥接。

【讨论】:

  • 感谢您的澄清。对我来说仍然是个谜是delay 的所有示例都有哪些好处。演示了一个不是真实世界用例的概念。并且使用异步 REST 库可以实现与纯 Java 相同的效果
  • 它们非常适合演示您的代码的总体外观,而不涉及特定库的细节。在实际代码中,您将有一个网络调用来代替delay。网络调用将是非阻塞的,但您将拥有简单的顺序代码。纯 Java 无法达到同样的效果。
【解决方案2】:

“我希望所有调用“并行”执行”!!

为什么不删除 Executors.newFixedThreadPool(5) 并把 Dispatchers.IO 交给他自己运行的协程呢?

runBlocking {
  (1..1000).forEach {
    launch(Dispatchers.IO) {
      delay(100)
    }
  }
}

【讨论】:

  • Dispatcher.IO到底不是线程池吗?还受到线程数量的限制(甚至可能与其他工作流程共享)? (也就是说,自从提出问题以来,我已经有一段时间没有尝试使用 Kotlin 并远离它了)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-10-20
  • 1970-01-01
  • 1970-01-01
  • 2013-08-22
  • 1970-01-01
  • 1970-01-01
  • 2016-06-25
相关资源
最近更新 更多