【问题标题】:Consume TCP stream and redirect it to another Sink (with Akka Streams)使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)
【发布时间】:2017-01-08 13:31:13
【问题描述】:

我尝试使用 Akka 2.4.3 将 TCP 流重定向/转发到另一个 Sink。 该程序应该打开一个服务器套接字,监听传入的连接,然后使用 tcp 流。我们的发件人不期望/接受我们的回复,所以我们从不发回任何东西——我们只是消费流。在构建 tcp 流之后,我们需要将字节转换为更有用的东西并将其发送到 Sink。

到目前为止,我尝试了以下方法,但尤其是在如何不将 tcp 数据包发送回发送者并正确连接接收器的部分。

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system, getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address, port) = ("127.0.0.1", 6000)
    server(system, address, port)
  }

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address, port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

更新:将此添加到您的 build.sbt 文件以获得必要的 deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

【问题讨论】:

  • 可能有助于显示您的 build.sbt 文件或至少显示依赖项,以便其他人可以重新创建它。
  • 在问题中添加了上面的部门

标签: scala tcp akka akka-stream


【解决方案1】:

handleWith 需要 Flow,即一个带有未连接入口和未连接出口的盒子。您有效地提供了Source,因为您使用to 操作将FlowSink 连接起来。

我认为您可以执行以下操作:

conn.handleWith(
  Flow[ByteString]
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
    .alsoTo(mySink)
    .map(_ => ByteString.empty)
    .filter(_ => false) // Prevents sending anything back
)

【讨论】:

    猜你喜欢
    • 2011-09-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-07-14
    • 1970-01-01
    • 2018-02-22
    • 1970-01-01
    相关资源
    最近更新 更多