【问题标题】:Save Doobie stream from database to file将 Doobie 流从数据库保存到文件
【发布时间】:2020-03-06 18:18:32
【问题描述】:

Doobie select 返回一个fs2.Stream(doobie.ConnectionIO, String)。如果我们需要将其写入文件,显而易见的选择是调用stream.compile.toList.transact(transactor),然后将此列表保存到文件中。

有没有办法以流的方式保存结果而不将其转换为列表?

【问题讨论】:

    标签: scala scala-cats fs2 cats-effect doobie


    【解决方案1】:

    诀窍是将cats.IO 操作转换为doobie.ConnectionIOAsync[doobie.ConnectionIO].liftIO(IO(...))。这允许将文件操作与数据库操作很好地结合起来。这是一个将结果流式传输到文件的完整示例程序。

    package com.example
    
    import java.io.BufferedWriter
    
    import better.files.File
    import cats.effect._
    import cats.implicits._
    import doobie._
    import doobie.implicits._
    import fs2.Stream
    
    
    object Example extends IOApp {
      override def run(args: List[String]): IO[ExitCode] = {
        val xa = Transactor.fromDriverManager[IO](
          "org.postgresql.Driver",     // driver classname
          "jdbc:postgresql:example_db",     // connect URL (driver-specific)
          "postgres",                  // user
          ""                          // password
        )
    
        val drop = sql"drop table if exists example".update.run
        val create =
          sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
        val insert = Update[String]("insert into example (string_value) values (?)")
          .updateMany(List("one", "two", "three", "four", "five"))
    
        val setup = for {
          _ <- drop.transact(xa)
          _ <- create.transact(xa)
          _ <- insert.transact(xa)
        } yield ()
    
        val select: Stream[doobie.ConnectionIO, String] =
          sql"select string_value from example".query[String].stream
        val output = writeToFile(select).compile.drain.transact(xa)
    
        for {
          _ <- setup
          _ <- output
        } yield ExitCode.Success
      }
    
      private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
        Stream.resource(writer("./example.txt")).flatMap { writer =>
          result.intersperse("\n").chunks.evalMap { chunk =>
            Async[doobie.ConnectionIO].liftIO(IO(
              chunk.foreach(writer.write)
            ))
          }
        }
      }
    
      private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
        Resource.make {
          Async[doobie.ConnectionIO].liftIO(IO(
            File(path).newBufferedWriter
          ))
        } { outStream =>
          Async[doobie.ConnectionIO].liftIO(IO(
            outStream.close())
          )
        }
      }
    }
    

    【讨论】:

    • 最好使用fs2提供的原生witeAll
    • 我试过了,它不起作用,因为 writeAll 需要 Concurrent[doobie.ConnectionIO] 而我没有。如果您知道使其工作的方法,请发布答案,我真的很想看到。
    • 不,我的意思是,您必须将doobie.ConnectionI 交易为cats.effect.IO 或其他东西,这是正确的。但是,我的建议是在写作部分,而不是按照你展示的方式去做。
    • 这就是重点,如果您从ConnectionIO 创建一个IO,那么您将得到一个列表,而不是一个流。如果您知道如何做到这一点,请发布答案,您可以以我的代码为基础,只更改写作部分。
    • stream.transact(xa).through(fs2.io.writeAll(???)) - tpolecat.github.io/doobie/docs/…
    【解决方案2】:

    我想这就是你要找的东西:

    import cats.effect.IO
    import doobie.implicits._
    import doobie.util.transactor.Transactor
    import fs2.text
    import fs2.io.file.{Files, Path}
    
    object Example {
      def queryToFile: IO[Unit] =
        sql"select string_value from example"
          .query[String]
          .stream
          .transact(xa)
          .through(text.utf8.encode[IO])
          .through(Files[IO].writeAll(Path("path-to-file")))
          .compile
          .drain
    }
    

    【讨论】:

      猜你喜欢
      • 2014-12-22
      • 2011-02-14
      • 2013-07-01
      • 1970-01-01
      • 2021-06-05
      • 2017-10-25
      • 2012-03-27
      • 2020-12-17
      • 1970-01-01
      相关资源
      最近更新 更多