【问题标题】:Asynchronous IO (socket) in ScalaScala 中的异步 IO(套接字)
【发布时间】:2017-03-04 23:46:18
【问题描述】:
import java.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel}
import java.net.InetSocketAddress
import scala.concurrent.{Future, blocking}

class Master {
  val server: AsynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()
  server.bind(new InetSocketAddress("localhost", port))

  val client: Future[AsynchronousSocketChannel] = Future { blocking { server.accept().get() } }
}

这是我正在尝试的伪代码。

在问这个问题之前,我搜索了一下,发现了一个相关的answer。 在她的回答中,我想知道这是什么意思:"If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library."

由于我不想遭受running out of memory(使用blocking时的情况)或thread pool hell(相反情况)的困扰,所以她的回答正是我一直期待的。但是,正如您在我的伪代码中看到的那样,由于blocking,将为每个客户端创建一个新线程(为了简单起见,我只创建了一个客户端),即使我按照她说的那样使用 NIO。

  1. 请用一个简单的例子解释她的建议。
  2. 在 Scala 中尝试异步 io 时,我的伪代码是一种合适的方法还是有更好的替代方法?

【问题讨论】:

    标签: scala future


    【解决方案1】:

    问题 1 的答案

    她提出了两件事

    a) 如果您使用阻塞呼叫的未来,请使用scala.concurrent.blockingblocking 告诉默认执行上下文生成临时线程以停止饥饿。

    让我们说blockingServe() 会阻塞。要执行多个blockingServes,我们使用 Futures。

    Future {
      blockingServe() //blockingServe() could be serverSocket.accept()
    }
    

    但上面的代码导致事件模型中的饥饿。为了应对饥饿。我们必须询问执行上下文来创建新的临时线程来服务额外的请求。这是使用scala.concurrent.blocking 传达给执行上下文的

    Future {
      blocking {
        blockingServe() //blockingServe() could be serverSocket.accept()
      }
    }
    

    b)

    我们还没有实现非阻塞。代码仍在阻塞,但在不同线程中阻塞(异步)

    我们如何才能实现真正的非阻塞?

    我们可以使用non-blocking api 实现真正的非阻塞。

    因此,在您的情况下,您必须使用 java.nio.channels.ServerSocketChannel 来选择真正的非阻塞模型。

    请注意,在您的代码 sn-p 中,您混合了 a) 和 b),这不是必需的

    问题 2 的答案

     val selector = Selector.open()
     val serverChannel = ServerSocketChannel.open()
     serverChannel.configureBlocking(false)
     serverChannel.socket().bind(new InetSocketAddress("192.168.2.1", 5000))
     serverChannel.register(selector, SelectionKey.OP_ACCEPT)
     def assignWork[A](serverChannel: ServerSocketChannel, selector: Selector, work:  => Future[A]) = {
    
       work
       //recurse
     }
     assignWork[Unit](serverChannel, selector, Future(()))
    }
    

    【讨论】:

    • 现在我明白了。我感到困惑的是non-blockingasynchronous 之间的区别。但是,在阅读了一些轮询代码之后,我改变了主意。我会为简单代码选择异步方式,即使它消耗更多线程。在这种情况下,我的伪代码是最好的吗?
    • @illuxic 是的。你可以去nio库的阻塞版本
    猜你喜欢
    • 1970-01-01
    • 2013-04-09
    • 2012-10-17
    • 1970-01-01
    • 2023-03-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多