【问题标题】:Is there an easy way to get a Stream as output of a RowParser?有没有一种简单的方法可以将 Stream 作为 RowParser 的输出?
【发布时间】:2012-10-27 13:46:55
【问题描述】:

给定 RowParser[Photo] 类型的 rowParser,根据我目前看到的代码示例,这就是解析来自表 photo 的行列表的方式:

def getPhotos(album: Album): List[Photo] = DB.withConnection { implicit c =>
  SQL("select * from photo where album = {album}").on(
    'album -> album.id
  ).as(rowParser *)
}

* 运算符创建ResultSetParser[List[Photo]] 类型的解析器。现在,我想知道是否同样有可能获得一个产生Stream 的解析器(认为更懒惰总是更好),但我只是想出了这个:

def getPhotos(album: Album): Stream[Photo] = DB.withConnection { implicit c =>
  SQL("select * from photo where album = {album}").on(
    'album -> album.id
  )() collect (rowParser(_) match { case Success(photo) => photo })
}

它有效,但似乎过于复杂。我当然可以在从第一个函数获得的List 上调用toStream,但我的目标是仅在实际读取的行上应用rowParser。有没有更简单的方法来实现这一点?

编辑:如果事先知道感兴趣的行数,我知道应该在查询中使用limit。我也知道,在很多情况下,无论如何你都会使用整个结果,所以懒惰不会提高性能。但在某些情况下,您可能会节省几个周期,例如如果由于某种原因,您有无法或不想在 SQL 中表达的搜索条件。所以我觉得奇怪的是,鉴于 anorm 提供了一种获取SqlRowStream 的方法,我没有找到一种直接的方法来应用RowParser

【问题讨论】:

    标签: scala playframework-2.0 anorm


    【解决方案1】:

    我最终创建了自己的 stream 方法,它对应于 list 方法:

    def stream[A](p: RowParser[A]) = new ResultSetParser[Stream[A]]  {
          def apply(rows: SqlParser.ResultSet): SqlResult[Stream[A]] = rows.headOption.map(p(_)) match {
            case None => Success(Stream.empty[A])
            case Some(Success(a)) => {
              val s: Stream[A] = a #:: rows.tail.flatMap(r => p(r) match {
                case Success(r) => Some(r)
                case _ => None
              })  
    
              Success(s)
            }
            case Some(Error(msg)) => Error(msg)
          }
       } 
    

    请注意,播放SqlResult 只能是成功/错误,而每一行也可以是成功/错误。我只处理第一行,假设其余部分相同。这可能适合您,也可能不适合您。

    【讨论】:

    • 我在尝试使用它时收到此错误。 java.sql.SQLException: Operation not allowed after ResultSet closed
    【解决方案2】:

    您最好使用limitoffset 进行较小的(分页)查询。

    如果您要将(大)结果保存在内存中并从那里流式传输,则 Anorm 需要进行一些修改。然后另一个问题是 JVM 的新内存要求。您将如何处理服务级别的缓存?看,以前你可以轻松缓存photos?page=1&size=10 之类的东西,但现在你只有photos,缓存技术不知道如何处理流。

    更糟糕的是,可能在 JDBC 级别上,将 Stream 包装在 limited 和 offset-ed execute 语句周围,并且只是在幕后多次调用数据库,但这听起来需要将 Scala 生成的 Stream 代码移植到 Java 领域(与 Groovy、jRuby 等一起使用),然后将其用于 JDBC 5 或 6 路线图的批准,需要做相当多的工作。这个想法可能会因为太复杂而被回避,事实就是如此。

    您可以将 Stream 包裹在整个 DAO 周围(limitoffset 的诡计会发生),但这听起来麻烦多于其价值 :-)

    【讨论】:

    • 好的,抱歉,我可能不够具体,但我已经知道你在说什么。我添加了一段来澄清/激发问题。
    【解决方案3】:

    我遇到了类似的情况,但是当转换为 Streams 的内置 anorm 函数尝试解析结果集时遇到了调用堆栈溢出异常。

    为了解决这个问题,我选择放弃异常的 ResultSetParser 范式,并退回到 java.sql.ResultSet 对象。

    我想使用 anorm 的内部类来解析结果集行,但是,从 2.4 版开始,他们将所有相关的类和方法都设为其包的私有,并且弃用了其他一些本来会更多的方法直接使用。

    我结合使用 Promises 和 Futures 来解决 anorm 现在返回的 ManagedResource。我避免了所有已弃用的功能。

    import anorm._
    import java.sql.ResultSet
    import scala.concurrent._
    
    def SqlStream[T](sql:SqlQuery)(parse:ResultSet => T)(implicit ec:ExecutionContext):Future[Stream[T]] = {
      val conn = db.getConnection()
      val mr = sql.preparedStatement(conn, false)
      val p = Promise[Unit]()
      val p2 = Promise[ResultSet]()
      Future {
        mr.map({ stmt =>
          p2.success(stmt.executeQuery)
          Await.ready(p.future, duration.Duration.Inf)
        }).acquireAndGet(identity).andThen { case _ => conn.close() }
      }
      def _stream(rs:ResultSet):Stream[T] = {
        if (rs.next()) parse(rs) #:: _stream(rs)
        else {
          p.success(())
          Stream.empty
        }
      }
      p2.future.map { rs =>
        rs.beforeFirst()
        _stream(rs)
      }
    }
    

    这个函数的一个相当简单的用法是这样的:

    def getText(implicit ec:ExecutionContext):Future[Stream[String]] = {
      SqlStream(SQL("select FIELD from TABLE")) { rs => rs.getString("FIELD") }
    }
    

    当然,这种方法也有缺点,但是,这解决了我的问题,并且不需要包含任何其他库。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-09-24
      • 1970-01-01
      • 1970-01-01
      • 2010-09-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多