【问题标题】:Tcp request to the database with akka streams使用 akka 流对数据库的 TCP 请求
【发布时间】:2015-12-13 15:54:23
【问题描述】:

我正在尝试使用 akka-streams 的 Tcp 客户端向数据库发送查询,但我不明白我缺少什么。

所以我有两种类型 QueryResponse 可以完美地与 akka 的 ByteString 相互转换。所以我正在创建一个与val conn = Tcp().outgoingConnection("localhost", 28015) 的客户端连接,这给了我一个Flow[ByteString, ByteString, Future[OutgoingConnection]],到目前为止一切都很好。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像Source(Future.successful(query)) 那样构造它,并将它连接到流source.via(flow),这给出了我另一个Source[Response, Unit]。在这里我无法理解如何获得Future[Response],尝试了几个组合器,但它给了我Materialized 值,我不完全理解它与流中的值/类型的关系。

【问题讨论】:

    标签: scala tcp akka akka-stream


    【解决方案1】:

    首先:它是什么类型的数据库,为什么要直接通过 TCP 访问它?你确定这会像你缩进它那样工作吗?您是否能够处理传入响应的框架?

    关于从Source[Response, Unit] 中获取Future[Response] 的问题,就像使用Sink.head 运行Source 一样简单,即:val res: Future[Result] = source.runWith(Sink.head)(您当然需要implicit val mat = ActorMaterializer()) )。

    我强烈建议您在更深入地使用 Streams 之前花一些时间在 Akka Streams documentation 上。

    【讨论】:

    • 我正在 akka-streams tcp 之上为 RethinkDB 编写一个驱动程序以获取乐趣和利润。谢谢,我肯定需要花更多时间在 API 和概念上。
    【解决方案2】:

    您可以在Flow 上使用join 方法。来自文档:

    通过交叉连接输入和 输出,创建一个 RunnableGraph。

    +------+        +-------+
    |      | ~Out~> |       |
    | this |        | other |
    |      | <~In~  |       |
    +------+        +-------+
    

    这允许您将连接的输出连接到 Flow 的输入,并将 Flow 的输出连接到连接输入。

    具体来说,您可以使用从输出连接生成的Flow 并将其与您创建的流连接以响应查询:

    def queryDB(query : ByteString) : Future[ByteString] = ???
    
    val concurrentQueries = 10
    
    val queryResponder = 
      Flow[ByteString].mapAsync(concurrentQueries)(queryDB)
    
    val server : String = ???
    val port : Int = ???
    
    //from the diagram above:
    //this = connection
    //other = queryResponder
    Tcp().outgoingConnection(server, port).join(queryResponder).run()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-12-13
      相关资源
      最近更新 更多