【问题标题】:How to write the contents of a Scala stream to a file?如何将 Scala 流的内容写入文件?
【发布时间】:2015-07-10 18:39:14
【问题描述】:

我有一个想要写入文件的 Scala 字节流。流中的数据过多,无法缓冲所有内存。

作为第一次尝试,我创建了一个类似于此的InputStream

class MyInputStream(data: Stream[Byte]) extends InputStream {
  private val iterator = data.iterator
  override def read(): Int = if (iterator.hasNext) iterator.next else -1
}

然后我使用 Apache Commons 编写文件:

val source = new MyInputStream(dataStream)
val target = new FileOutputStream(file)
try {
  IOUtils.copy(source, target)
} finally {
  target.close
}

这可行,但我对性能不太满意。我猜想为每个字节调用MyInputStream.read 会带来很多开销。有没有更好的办法?

【问题讨论】:

    标签: scala io


    【解决方案1】:

    您可能(也可能不会!)误认为读取端是性能问题的根源。这可能是因为您使用的是无缓冲的 FileOutputStream(...),从而强制对每个写入的字节进行单独的系统调用。

    这是我的看法,快速而简单:

    def writeBytes( data : Stream[Byte], file : File ) = {
      val target = new BufferedOutputStream( new FileOutputStream(file) );
      try data.foreach( target.write(_) ) finally target.close;
    }
    

    【讨论】:

    • 好收获!我定义了一个 100 MB 的测试流:(1 to 100 * 1024 * 1024).toStream.map(_.toByte)。缓冲输出流将平均运行时间从 16.5 秒提高到 14.7 秒。它没有得到我想要的性能,但它有点帮助。我猜如果我有一个机械硬盘驱动器而不是 SSD 会更有帮助。
    • 我会接受这个作为获胜者,因为它在我的测试中产生了最好的结果。它只比其他大多数技术略胜一筹,但差异足以清楚地表明。我了解到Stream[Byte] 是一种非常低效的数据处理方式。流本身是主要瓶颈。 Stream[String]Stream[Array[Byte]] 是快速移动数据的更好选择。
    【解决方案2】:

    我推荐java.nio.file 包。使用Files.write,您可以将Bytes 的Arrays 写入从文件名构造的Path

    如何提供Bytes 由您决定。您可以使用.toArrayStream 转换为Array,或者您可以一次将一个(或几个)流中的take 字节转换为数组。

    这是一个演示.toArray 方法的简单代码块。

    import java.nio.file.{Files, Paths}
    
    val filename: String = "output.bin"
    val bytes: Stream[Byte] = ...
    Files.write(Paths.get(filename), bytes.toArray)
    

    【讨论】:

    • 我认为@mrog 面临的问题是.toArray 强制将流具体化到内存中,并且它太大了。
    • 我尝试了 NIO 方法,递归地拆分流以获取字节。写入 100 MB 需要 16 秒,这在性能上与最初的 MyInputStream 方法几乎相同。我使用的来源是(1 to 100 * 1024 * 1024).toStream.map(_.toByte),应该很快。相比之下,使用Files.copy(Paths.get("/tmp/input"), Paths.get("/tmp/output"), StandardCopyOption.REPLACE_EXISTING) 将相同数据从一个文件复制到另一个文件需要 0.19 秒。我觉得我错过了两个数量级的性能。
    • 我对@9​​87654337@ 进行了性能测试,只是迭代数据,但没有将其写入任何地方。 100 MB 需要 0.002 秒。所以,我使用的测试流显然不是瓶颈。并且使用 IOUtils.copy 将相同的数据从一个文件复制到另一个文件需要 0.12 秒(甚至比使用 NIO 还要快),因此文件系统不是瓶颈。
    • 我在测试流的性能时搞砸了。迭代流的内容(不对数据做任何事情)实际上需要 13.8 秒,而不是 0.002 秒。所以,流是瓶颈。
    【解决方案3】:

    您应该在 InputStream 实现中实现批量读取覆盖:

    override def read(b: Array[Byte], off: Int, len: Int)
    

    IOUtils.copy 使用该签名读取/写入 4K 块。

    【讨论】:

    • 我试过了,并没有注意到性能上有任何差异。也许那是因为我仍然依赖流迭代器一次读取一个字节。您能否建议一个不依赖流迭代器的 read(b: Array[Byte], off: Int, len: Int) 的良好实现?
    【解决方案4】:

    鉴于StreamIterator 一次读取一个字节可能是瓶颈,我设计了一种将流写入OutputStream 的方法,该方法不依赖于它并且希望更高效:

    object StreamCopier {
      def copy(data: Stream[Byte], output: OutputStream) = {
        def write(d: Stream[Byte]): Unit = if (d.nonEmpty) {
          val (head, tail) = d.splitAt(4 * 1024)
          val bytes = head.toArray
          output.write(bytes, 0, bytes.length)
          write(tail)
        }
        write(data)
      }
    }
    

    编辑:通过在尾递归 write 函数中将 data 替换为 d 修复了一个错误。

    此方法通过splitAt 使用递归方法将流拆分为第一个~4K 和其余部分,将其头部写入OutputStream 并在流的尾部递归,直到splitAt 返回一个空流。

    既然您已经制定了性能基准,我会留给您判断这是否更有效。

    【讨论】:

    • 我喜欢这种方法,因为它非常简单。我不得不稍微重构它,因为外部方法在内部方法运行时保持对流的引用,因此消耗了大量内存。 object StreamCopier2 { @tailrec def copy(data: Stream[Byte], output: OutputStream): Unit = { if (data.nonEmpty) { val (head, tail) = data.splitAt(4 * 1024) val bytes = head. toArray output.write(bytes, 0, bytes.length) 复制(tail, output) } } }
    • 有趣的是,新版本处理 100 MB 所需的时间与我原来的 MyInputStream 方法完全相同。主要区别在于MyInputStream 会从添加BufferedOutputStream 中略微受益,但StreamCopier2 运行时间相同,无论是否存在BufferedOutputStream。这使得MyInputStream 加上BufferedOutputStream 以微弱优势成为赢家。
    • 对不起..那个功能完全坏了。正如您指出的那样,data 不应该在write 内部使用,然后我什至从未启动尾递归。我的解决方案与您的建议有点不同,所以看看这是否对您有影响。
    • 编辑后的版本有效,但对于大流来说仍然很慢。我相信datawrite 完成之前一直在范围内,导致JVM 频繁进行垃圾收集。一个 1 MB 的流需要 0.4 秒,一个 10 MB 的流需要 5.1 秒,这比我预期的要多一点。我尝试了一个 100 MB 的流,但在等待它完成几分钟后我放弃了。如果没有嵌套方法,它会快得多。
    • 我重新启动了 IDE 并再次尝试。一个 100 MB 的流需要 110 秒,几乎是没有嵌套方法所用时间的 7 倍。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-10
    • 2011-03-05
    相关资源
    最近更新 更多