【发布时间】:2016-10-14 11:20:36
【问题描述】:
我有一个文件列表。我要:
- 将所有这些作为单一来源读取。
- 文件应按顺序读取。 (没有循环)
- 任何时候都不应要求任何文件完全在内存中。
- 从文件中读取错误应该折叠流。
感觉这应该可行:(Scala,akka-streams v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但这会导致编译错误,因为FileIO 具有与之关联的具体化值,而Source.combine 不支持。
映射物化值让我想知道如何处理文件读取错误,但确实可以编译:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
但在运行时抛出 IllegalArgumentException:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
【问题讨论】:
标签: scala akka akka-stream