【问题标题】:Akka scala-io ask and wait for responeAkka scala-io 询问并等待响应
【发布时间】:2014-11-29 12:42:15
【问题描述】:

我在我的 akka 演员中使用 scala-io,在我的情况下,我需要在官方文档 (http://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html) 中发送请求并等待响应,我可以看到答案是异步的。

如何等待响应,我可以使用某种方式吗? (问)模式

class SocketClient(remoteAddress: InetSocketAddress, listener: ActorRef) extends Actor {

  import Tcp._
  import context.system

  IO(Tcp) ! Connect(remoteAddress)

  def receive = {

    case CommandFailed(_: Connect) =>
      listener ! ConnectFailure
      context stop self

    case Connected(remote, local) =>
      listener ! ConnectSuccess

      val connection = sender
      connection ! Register(self)

      context become {

        case data: ByteString =>
          connection ! Write(data)

        case CommandFailed(w: Write) =>
          Logger.error(s"Error during writing")

        case Received(data) =>
          listener ! data

        case Disconnect =>
          connection ! Close

        case _: ConnectionClosed =>
          Logger.error(s"Connection has been closed ${remoteAddress.getAddress}")
          context stop self
      }
  }
}

我可以使用类似的东西吗:

connection ? Write(data)

【问题讨论】:

    标签: scala sockets akka


    【解决方案1】:

    是的,但您应该考虑到这样一个事实,即 ask-pattern 只允许您接收来自 actor 的第一个回复。

    在您的情况下,它是connection,它可能会回复一些其他甚至不相关的对象(这取决于您选择的背压/确认模式。例如,如果您使用Write - 您可能会收到书面(到socket) 对象的确认而不是响应。

    您可以通过以下方式避免它:

    换句话说,ask 模式只是为每条消息创建自己的内部参与者并使其成为发送者,因此所有回复(针对此特定消息)都将发送给该​​微参与者。当它收到第一个回复时 - 未来(由? 返回)完成 - 内部参与者被销毁(因此其他回复将被忽略)。

    另外,connection 会自动回复注册的(通过Register 消息)侦听器而不是发送者 - 所以您应该创建中介参与者:

    class Asker(connection: ActorRef) extends Actor {
       import Tcp._
       connection ! Register(self); 
       def receive = {
            case x => 
              val parent = sender()
              connection ! x
              context become {case x => parent ! x; context.unbecome()} 
       }   
    }
    
    trait TcpAskSupport {
       self: Actor => 
    
       def asker(connection: ActorRef) = 
          context.child(connection.path.name + "Asker")
            .getOrElse(system.actorOf(Props(classOf[Asker], connection), 
               connection.path.name + "Asker"))
    }
    

    使用示例:

    class Client extends Actor with TcpAskSupport {
    
      import Tcp._
      import context.system
    
      IO(Tcp) ! Connect(new InetSocketAddress("61.91.16.168", 80))
      implicit val timeout = Timeout(new FiniteDuration(5, SECONDS))
    
      def receive = {
        case CommandFailed(_: Connect) =>
          println("connect failed")
          context stop self
    
        case c @ Connected(remote, local) =>
          println("Connected" + c)
          val connection = sender()
          asker(connection) ? Write(ByteString("GET /\n", "UTF-8"), NoAck) onComplete {
              case x => println("Response" + x)
          } 
        case x => println("[ERROR] Received" + x)    
      }
    }
    

    【讨论】:

    • 当我使用时:connection ? Write(data, NoAck) 它不会返回任何等待的内容,但我也会收到带有文本响应的Received(data)
    • 这不适用于多次快速写入,第二次写入将用作响应
    • 正如我在开始时所说的 - 您只能将询问模式用于原子请求/响应。如果你的意思是你想在收到回复后发送第二条消息 - 你可以在 onComplete 里面做。
    • 不,我有多个请求,例如单个请求数据->响应和后台 ping->pong 以保持连接
    • 一般情况下,如果您有一个复杂的协议 - 您应该关联每个子协议的消息并在 Asker 内部路由它们(并且每个子协议使用一个客户端参与者)。附言仅当所有子协议都应位于一个连接内时,您才需要它
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多