【问题标题】:Scala - TCP Packet frame using AkkaScala - 使用 Akka 的 TCP 数据包帧
【发布时间】:2015-08-20 08:40:44
【问题描述】:

Akka 有什么方法可以像 Erlang 中的 {packet,4} 一样实现数据包帧吗? 数据包看起来像这样:

4 bytes length in big endian | body...

例如:

00 00 00 05 H E L L O 0 0 0 5 W O R L D

将是两个数据包“HELLO”和“WORLD”,但它们作为一个接收。

或者

00 00 00 05 H E L L

现在 Akka 接收到这 8 个字节,但还缺少一个,它将在下一次调用“receive”时接收

问题是我的演员的接收总是通过部分或全部请求调用,但我只想在接收中获取“正文”部分,并且只有当它完全接收时。

所以所有需要的是它首先读取这 4 个字节,然后等待直到它读取其他 N 个字节(N = 4 字节标头中的长度),然后它向我的演员发送消息。有没有可能?

我的服务器代码:

import java.net.InetSocketAddress

import akka.actor.{Props, Actor}
import akka.io.Tcp.Bind
import akka.io.{IO, Tcp}

class Server extends Actor{
    import context.system
    import Tcp._
    IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 1234))

    def receive ={
        case bound @ Bound(localAddr) =>
            println("Server is bound to "+localAddr.toString())
        case failed @ CommandFailed(_ : Bind) =>
            context stop self
        case connected @ Connected(remote, local) =>
            val handler = context.actorOf(Props[ClientHandler])
            val connection = sender()
            println(remote.toString + "connected to "+local.toString())

            connection ! Register(handler)
    }
}

【问题讨论】:

    标签: scala tcp server akka


    【解决方案1】:

    据我所知,Akka 或 Scala 中没有用于此的库函数。 Akka 使用 ByteString 进行读写交易,所以我整理了一个 trait,它可以完全满足您的要求。您将发送给您的演员的 ByteString 传递给它。然后它根据标头中的数据包长度分解流。它是无状态的,因此它返回一个元组,其中包含提取的数据包列表以及来自 TCP 流的任何未使用数据作为 ByteString。您将新的 TCP 数据连接到此字节字符串中返回的流的未使用部分,如下例所示。

    trait Buffering {
    
      val MAX_PACKET_LEN: Short = 10000
    
      /**
       * Extracts complete packets of the specified length, preserving remainder
       * data. If there is no complete packet, then we return an empty list. If
       * there are multiple packets available, all packets are extracted, Any remaining data
       * is returned to the caller for later submission
       * @param data A list of the packets extracted from the raw data in order of receipt
       * @return A list of ByteStrings containing extracted packets as well as any remaining buffer data not consumed
       */
      def getPacket(data: ByteString): (List[ByteString], ByteString) = {
    
        val headerSize = 2
    
        @tailrec
        def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = {
          if (current.length < headerSize) {
            (packets.reverse, current)
          } else {
            val len = current.iterator.getShort
            if (len > MAX_PACKET_LEN || len < 0) throw new RuntimeException(s"Invalid packet length: $len")
            if (current.length < len + headerSize) {
              (packets.reverse, current)
            } else {
              val rem = current drop headerSize // Pop off header
              val (front, back) = rem.splitAt(len) // Front contains a completed packet, back contains the remaining data
              // Pull of the packet and recurse to see if there is another packet available
              multiPacket(front :: packets, back)
            }
          }
        }
        multiPacket(List[ByteString](), data)
      }
    

    演员的用法如下:

    def receive = buffer(CompactByteString())
    
    def buffer(buf: ByteString): Receive = {
      // Messages inbound from the network
      case Received(data) =>
        val (pkt, remainder) = getPacket(buf ++ data)
        // Do something with your packet
        context become buffer(remainder) 
      case Other Stuff => // Etc
    }
    

    【讨论】:

      猜你喜欢
      • 2014-11-06
      • 1970-01-01
      • 2015-12-13
      • 2014-09-29
      • 2015-01-10
      • 1970-01-01
      相关资源
      最近更新 更多