【问题标题】:How does I/O work in Akka?Akka 的 I/O 是如何工作的?
【发布时间】:2011-09-25 14:33:01
【问题描述】:

当您需要执行 I/O(即数据库操作)时,actor 模型(在 Akka 中)如何工作?

据我了解,阻塞操作会引发异常(并且由于 Akka 使用的 Netty 的事件性质,基本上会破坏所有并发性)。因此,我将不得不使用 Future 或类似的东西 - 但是我不了解并发模型。

  1. 可以 1 个参与者同时处理多条消息吗?
  2. 如果演员在future(即future.get())中进行阻塞调用,则只会阻止当前演员的执行;还是会在阻塞调用完成之前阻止所有参与者执行?
  3. 如果它阻塞了所有执行,那么如何使用未来辅助并发(即,在未来调用阻塞调用仍然等于创建一个参与者并执行阻塞调用)?
  4. 处理每一步都依赖于最后一步的多阶段流程(即,从数据库读取;调用阻塞 Web 服务;从数据库读取;写入数据库)的最佳方法是什么?李>

基本上下文是这样的:

  • 我正在使用一个 Websocket 服务器,它将维护数千个会话。
  • 每个会话都有一些状态(即身份验证详细信息等);
  • Javascript 客户端将向服务器发送 JSON-RPC 消息,服务器会将其传递给适当的会话参与者,后者将执行它并返回结果。
  • RPC 调用的执行将涉及一些 I/O 和阻塞调用。
  • 会有大量并发请求(每个用户都会通过 WebSocket 连接发出大量请求,并且会有很多用户)。

有没有更好的方法来实现这一点?

【问题讨论】:

  • 我很惊讶没有人提到异步 IO 使用类似于 Node.js/Twisted/gevent 等的方法。

标签: scala scalability websocket actor akka


【解决方案1】:

Raymond 和范式所说的,而且,如果你想避免让线程池挨饿,你应该将任何阻塞操作包装在 scala.concurrent.blocking 中。

当然最好避免阻塞操作,但有时您需要使用阻塞的库。如果您将上述代码包装在blocking 中,它将让执行上下文知道您可能阻塞了该线程,因此它可以在需要时分配另一个线程。

这个问题比典型描述的更糟糕,因为如果你有几个阻塞操作,你最终可能会阻塞线程池中的所有线程并且没有空闲线程。如果您的所有线程都被阻塞在另一个演员/未来被安排之前不会发生的事情上,那么您最终可能会陷入死锁。

这是一个例子:

导入 scala.concurrent.blocking ... 未来 { val image = 阻塞 { load_image_from_potentially_slow_media() } val 增强 = image.enhance() 阻止{ 如果(oracle.queryBetter(图像,增强)){ write_new_image(增强) } } 增强型 }

文档是here

【讨论】:

    【解决方案2】:

    阻塞操作通常不会抛出异常,但等待未来(例如通过使用!!!!! 发送方法)可能会抛出超时异常。这就是为什么您应该尽可能坚持使用即发即弃,使用有意义的超时值并尽可能使用回调。

    1. akka actor 无法显式连续处理多个消息,但您可以通过配置文件使用 throughput 值。如果消息队列不为空,actor 将处理多条消息(即其接收方法将按顺序调用多次):http://akka.io/docs/akka/1.1.3/scala/dispatchers.html#id5

    2. 在 Actor 内部阻塞操作不会“阻塞”所有 Actor,但如果您在 Actor 之间共享线程(推荐使用),调度程序的其中一个线程将被阻塞,直到操作恢复。所以尽量组合future,注意超时值)。

    3 和 4。我同意 Raymond 的回答。

    【讨论】:

      【解决方案3】:

      阻塞操作不会在 Akka 中引发异常。您可以阻止来自 Actor 的调用(您可能希望将其最小化,但那是另一回事了)。

      1. 不,1 个演员实例不能。
      2. 它不会阻止任何其他参与者。您可以通过使用特定的 Dispatcher 来影响这一点。 Futures 使用默认的调度程序(通常是全局事件驱动的),因此它在池中的线​​程上运行。您可以选择要为您的演员使用的调度程序(每个演员,或全部)。我想如果你真的想创建一个问题,你也许可以将完全相同的(基于线程的)调度程序传递给期货和演员,但这需要你的一些意图。我想如果你有大量的期货无限期地阻塞并且 executorservice 被配置为固定数量的线程,你可能会炸毁 executorservice。所以有很多“如果”。仅当 Future 尚未完成时 f.get 才会阻塞。它将阻塞您调用它的 Actor 的“当前线程”(如果您从 Actor 调用它,顺便说一句,这不是必需的)
      3. 您不一定要阻止。您可以使用回调而不是 f.get。你甚至可以在不阻塞的情况下编写 Futures。查看 Viktor 关于“akka 充满希望的未来”的演讲,了解更多详情:http://skillsmatter.com/podcast/scala/talk-by-viktor-klang
      4. 我会在步骤之间使用异步通信(如果这些步骤本身就是有意义的过程),因此每个步骤都使用一个参与者,其中每个参与者都向下一个参与者发送单向消息,也可能向其他参与者发送单向消息那不会阻塞可以监督进程的。这样你就可以创建一个actor链,你可以在其中创建许多actor链,在它前面你可以放置一个负载平衡actor,这样如果一个actor阻塞在一个链中,另一个相同类型的actor可能不会出现在另一个链中。这也适用于您的“上下文”问题,将工作负载传递给本地参与者,将它们链接到负载平衡参与者后面。

      至于 netty(我假设你的意思是远程 Actor,因为这是 netty 在 Akka 中唯一使用的东西),如果你的工作尽快传递给本地 Actor 或未来(带回调)你担心时机或阻止 netty 以某种方式完成它的工作。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-03-09
        • 2014-05-17
        • 2014-07-25
        • 1970-01-01
        • 2014-05-13
        • 2016-07-05
        相关资源
        最近更新 更多