【问题标题】:Akka streams: Reading multiple filesAkka 流:读取多个文件
【发布时间】:2016-10-14 11:20:36
【问题描述】:

我有一个文件列表。我要:

  1. 将所有这些作为单一来源读取。
  2. 文件应按顺序读取。 (没有循环)
  3. 任何时候都不应要求任何文件完全在内存中。
  4. 从文件中读取错误应该折叠流。

感觉这应该可行:(Scala,akka-streams v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines

但这会导致编译错误,因为FileIO 具有与之关联的具体化值,而Source.combine 不支持。

映射物化值让我想知道如何处理文件读取错误,但确实可以编译:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
    .mapMaterializedValue(f => NotUsed.getInstance())
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))  // counting lines

但在运行时抛出 IllegalArgumentException:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    为了清楚地模块化不同的关注点,下面的代码并没有尽可能的简洁。

    // Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
    val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)
    
    // given as stream of Paths we read those files and count the number of lines
    val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)
    
    // Here's our test data source (replace paths with real paths)
    val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))
    
    // Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
    testFiles runWith lineCounter foreach println
    

    【讨论】:

    • 我一直在寻找模块化,所以我很感激。我使用行数作为我可以对文件做的事情的一个例子,而lineCounter aswritten 将其与文件读取混为一谈。 (它是一个 Sink)但是如果我将折叠和它之后的所有内容移动到别处,我会留下一个 Flow[Path,String, NotUsed],这正是我正在寻找的部分。
    • 能否请您在示例中提供导入,它们是代码的重要组成部分。
    • @OsskarWerrewka 应该都在akka.stream.scaladsl和java IO/NIO中。你有问题吗?
    • 我终于做到了。谢谢。
    【解决方案2】:

    更新哦,我没有看到接受的答案,因为我没有刷新页面>_

    我相信以下程序可以满足您的要求:

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, IOResult}
    import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
    import akka.util.ByteString
    import scala.concurrent.{Await, Future}
    import scala.util.{Failure, Success}
    import scala.util.control.NonFatal
    import java.nio.file.Paths
    import scala.concurrent.duration._
    
    object TestMain extends App {
      implicit val actorSystem = ActorSystem("test")
      implicit val materializer = ActorMaterializer()
      implicit def ec = actorSystem.dispatcher
    
      val sources = Vector("build.sbt", ".gitignore")
        .map(Paths.get(_))
        .map(p =>
          FileIO.fromPath(p)
            .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
            .mapMaterializedValue { f =>
              f.onComplete {
                case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
                case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
                case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
              }
              NotUsed
            }
        )
      val finalSource = Source(sources).flatMapConcat(identity)
    
      val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
      result.onComplete {
        case Success(n) => println(s"Read $n lines total")
        case Failure(e) => println(s"Reading failed: $e")
      }
      Await.ready(result, 10.seconds)
    
      actorSystem.terminate()
    }
    

    这里的关键是flatMapConcat() 方法:它将流的每个元素转换为源,并返回由这些源产生的元素流(如果它们按顺序运行)。

    至于处理错误,您可以在mapMaterializedValue 参数中添加一个处理程序到未来,或者您可以通过在Sink.foreach 物化未来值上放置一个处理程序来处理正在运行的流的最终错误。我在上面的例子中都做了,如果你测试它,比如说,在一个不存在的文件上,你会看到同样的错误信息会被打印两次。不幸的是,flatMapConcat() 不收集具体化的值,坦率地说,我看不出它有什么方法可以做到这一点,因此如果需要,您必须单独处理它们。

    【讨论】:

      【解决方案3】:

      我确实有一个答案 - 不要使用akka.FileIO。这似乎工作正常,例如:

      val sources = Seq("sample.txt", "sample2.txt").map(io.Source.fromFile(_).getLines()).reduce(_ ++ _)
      val source = Source.fromIterator[String](() => sources)
      val lineCount = source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
      

      我还是想知道是否有更好的解决方案。

      【讨论】:

      • 通过使用io.Source,你会失去很多力量。对于小文件,这可能有效,但对于大文件,这不是一个选项。
      • @jarandaf 你能澄清一下吗?我的印象是 io.Source 只是在后台使用了 BufferedReader,而 getLines 迭代器不会一次加载整个文件或类似的东西。
      • 想一想,你可能是对的(尽管FileIO 处理ByteString 而不是String,这意味着更高的性能)。另一方面,对于io.Source,必须始终牢记关闭源(默认情况下不这样做)。
      • 上面的sources 名字是错误的,不是吗?在.reduce(_ ++ _) 之前是源,但之后(实际值)是单个Iterator[String],代表所有文件,已经合并。只是命名问题 - 喜欢代码!
      • 是的,我可以看到。我的意思是“包含许多源”的语义意义上的源,但你说得对,它是一个单一的迭代器变量,而不是一个列表,这可能会造成混淆。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-12-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多