【问题标题】:Stream records from DataBase using Akka Stream使用 Akka Stream 从数据库流式传输记录
【发布时间】:2017-12-13 10:53:59
【问题描述】:

我有一个使用 Akka 的系统,该系统目前通过消息队列处理传入的流数据。当一条记录到达然后处理它时,mq 被确认并传递记录以在系统内进一步处理。

现在我想添加对使用 DB 作为输入的支持。
输入源能够处理 DB 的方法是什么(应该以接收器可以处理的速度输入 > 100M 记录 - 所以我假设反应/akka-streams?)?

【问题讨论】:

    标签: akka akka-stream reactive-streams


    【解决方案1】:

    Slick 库

    Slick streaming 通常是这样做的。

    稍微扩展 slick 文档以包含 akka 流:

    //SELECT Name from Coffees
    val q = for (c <- coffees) yield c.name
    
    val action = q.result
    
    type Name = String
    
    val databasePublisher : DatabasePublisher[Name] = db stream action
    
    import akka.stream.scaladsl.Source
    
    val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
    

    现在 akkaSourceFromSlick 就像任何其他 akka 流 Source

    “老派”结果集

    也可以使用简单的ResultSet,而不是光滑,作为 akka 流的“引擎”。我们将利用流Source 可以从Iterator 实例化这一事实。

    首先使用标准 jdbc 技术创建 ResultSet:

    import java.sql._
    
    val resultSetGenerator : () => Try[ResultSet] = Try {
      val statement : Statement = ???
      statement executeQuery "SELECT Name from Coffees"
    }
    

    当然所有 ResultSet 实例都必须将光标移到第一行之前:

    val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] = 
      (resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
    

    一旦我们开始遍历行,我们就必须从正确的列中提取值:

    val getNameFromResultSet : ResultSet => Name = _ getString "Name"
    

    现在我们可以实现Iterator 接口,从结果集中创建Iterator[Name]

    val convertResultSetToNameIterator : ResultSet => Iterator[Name] = 
      (resultSet) => new Iterator[Try[Name]] {
        override def hasNext : Boolean  = resultSet.next
        override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
       } flatMap (_.toOption)
    

    最后,将所有部分粘合在一起,创建我们需要传递给Source.fromIterator的函数:

    val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] = 
      (_ : () => Try[ResultSet])
        .andThen(_ flatMap adjustResultSetBeforeFirst) 
        .andThen(_ map convertResultSetToNameIterator) 
        .andThen(_ getOrElse Iterator.empty)
    

    这个迭代器现在可以提供一个源:

    val akkaSourceFromResultSet : Source[Name, _] = 
      Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
    

    这个实现一直到数据库都是反应式的。由于 ResultSet 一次预取有限数量的行,因此数据只会在流Sink 发出需求信号时通过数据库从硬盘驱动器中取出。

    【讨论】:

    • 我们应该如何做一些清理动作,例如如果我们要关闭ResultSetStatement
    • @user6502167 您可以在resultSetGenerator 之前预先构建ResultSet,而不是在其中。然后在 Source 调用 close 的 close 钩子上。
    • 谢谢,@Ramon J Romero y Vigil。所以我需要进行 SQL 查询以在Source 之外创建ResultSet,并将其转换为Source,对吗? Source 的关闭钩在哪里?我想你不是指FutureonComplete 钩子。
    • @user6502167 通常,一旦实现了Source,它就会返回一个钩子。例如:如果您执行val hook : Future[Done] = mySource.runForeach(println) 之类的操作,则可以执行hook.foreach(_ =&gt; resultSet.close())。这确保了 ResultSet 仅在流完成处理所有行后才关闭。
    【解决方案2】:

    我发现 Alpakka 文档非常棒,并且比 Java Publisher 接口更容易使用反应流。

    Alpakka 项目是一个开源计划,旨在为 Java 和 Scala 实现流感知、反应式集成管道。它建立在 Akka Streams 之上,从一开始就设计用于理解原生流,并为响应式和面向流的编程提供 DSL,并内置对背压的支持

    使用 Slick 的 Alpakka 文档:https://doc.akka.io/docs/alpakka/current/slick.html

    Alpakka Github:https://github.com/akka/alpakka

    【讨论】:

    • 现在,如果 Akka 流 Web 应用程序宕机 30 分钟并重新启动会发生什么情况,它会从 30 分钟的间隙窗口流式传输所有新数据更改吗?另外我还有一个问题,这个库是在幕后使用变更数据捕获技术还是在时间戳列或主键上使用带有 where 条件的 sql 查询并继续池化数据库?
    • 这两个问题的答案是这个库只是通过 Slick 与数据库的交互。可以看源码,真的不多:github.com/akka/alpakka/blob/v2.0.2/slick/src/main/scala/akka/…Slick,本身就是一个方便的数据库访问。因为您想要实现的目标取决于您的数据库。您需要类似数据库触发器的东西(可能比轮询数据库更好)。对于 MySQL 示例,请查看:hevodata.com/learn/mysql-cdc
    猜你喜欢
    • 2017-11-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-15
    • 2016-01-14
    • 1970-01-01
    • 2014-05-08
    • 2019-05-23
    相关资源
    最近更新 更多