【问题标题】:Akka Streams: How to group a list of files in a source by size?Akka Streams:如何按大小对源中的文件列表进行分组?
【发布时间】:2018-01-13 03:27:01
【问题描述】:

所以我目前有一个 akka 流来读取文件列表,还有一个接收器来连接它们,而且效果很好:

val files = List("a.txt", "b.txt", "c.txt") // and so on;
val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))

val sink = Sink.fold[ByteString, ByteString](ByteString(""))(_ ++ ByteString("\n" ++ _) // Concatenate

source.toMat(sink)(Keep.right).run().flatMap(concatByteStr => writeByteStrToFile(concatByteStr, "an-output-file.txt"))

虽然这对于简单的情况来说很好,但文件相当大(大约 GB,并且无法放入我正在运行此应用程序的机器的内存中。所以我想分块在字节字符串达到一定大小后进行。一个选项是使用Source.grouped(N),但是文件的大小差异很大(从1 KB到2 GB),因此不能保证文件大小的规范化。

我的问题是是否有办法通过字节串的大小来分块写入文件。 akka 流的文档非常庞大,我无法弄清楚该库。任何帮助将不胜感激。谢谢!

【问题讨论】:

    标签: scala akka-stream


    【解决方案1】:

    来自 Akka Streams 的 FileIO 模块为您提供了一个流式 IO Sink 来写入文件,以及用于分块 ByteString 流的实用方法。您的示例将成为类似于

    val files = List("a.txt", "b.txt", "c.txt") // and so on;
    
    val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))
    val chunking = Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)
    val sink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(Paths.get("an-output-file.txt"))
    
    source.via(chunking).runWith(sink)
    

    使用FileIO.toPath sink 可以避免将整个折叠的ByteString 存储到内存中(因此允许正确的流式传输)。

    有关此 Akka 模块的更多详细信息,请参阅 docs

    【讨论】:

    • 所以这个问题是EOF没有新行,并且在输出文件中,文件末尾和文件开头位于同一行。我最初有这样的解决方案,但我很难在文件之间拆分。您对此有什么建议吗?
    • 您可以在将文件源进行平面映射之前将它们与“\n”交错。您应该可以通过如下调整源来实现此目的:val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)).concat(Source.single(ByteString("\n"))))
    【解决方案2】:

    我认为@Stefano Bonetti 已经提供了一个很好的解决方案。只是想补充一点,还可以考虑构建自定义 GraphStage 来解决特定的分块需求。本质上,为 Akka Stream link 中描述的 In/Out 处理程序创建一个如下所示的块发射方法:

    private def emitChunk(): Unit = {
      if (buffer.isEmpty) {
        if (isClosed(in)) completeStage()
        else pull(in)
      } else {
        val (chunk, nextBuffer) = buffer.splitAt(chunkSize)
        buffer = nextBuffer
        push(out, chunk)
      }
    }
    

    【讨论】:

    • 这更接近我想要的,因为我不仅要连接,还要在写入之前压缩文件。我正在考虑结合使用您的解决方案和 Stefano 的解决方案,以确保我可以充分阅读行,并充分分组。
    【解决方案3】:

    在对 Akka Streams 库进行了一周的修改后,我最终得到的解决方案是 Stefano 的答案以及提供的解决方案 here 的组合。我通过Framing.delimiter函数逐行读取文件源,然后简单地使用Alpakka提供的LogRotatorSink。确定日志轮换的内容在这里:

    val fileSizeRotationFunction = () => {
      val max = 10 * 1024 * 1024 // 10 MB, but whatever you really want; I had it at our HDFS block size
      var size: Long = max
      (element: ByteString) =>
        {
          if (size + element.size > max) {
            val path = Files.createTempFile("out-", ".log")
            size = element.size
            Some(path)
          } else {
            size += element.size
            None
          }
        }
    }
    
    val sizeRotatorSink: Sink[ByteString, Future[Done]] =
      LogRotatorSink(fileSizeRotationFunction)
    
    val source = Source(files).flatMapConcat(f => FileIO.fromPath(Paths.get(f)))
    val chunking = Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)
    
    source.via(chunking).runWith(sizeRotatorSink)
    

    就是这样。希望这对其他人有帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-20
      • 2016-02-02
      • 1970-01-01
      • 1970-01-01
      • 2019-01-20
      • 2019-07-01
      相关资源
      最近更新 更多