【问题标题】:How do I read a large CSV file with Scala Stream class?如何使用 Scala Stream 类读取大型 CSV 文件?
【发布时间】:2011-05-14 09:23:35
【问题描述】:

如何使用 Scala Stream 读取大型 CSV 文件 (> 1 Gb)?你有代码示例吗?或者您会使用其他方式读取大型 CSV 文件而不先将其加载到内存中?

【问题讨论】:

  • 您的意思是像惰性评估功能中的流吗?这大概是可能的,但不是必需的? - 逐行读取文件本质上已经是。我对 Scala io 的速度还不是很了解,但是 getLines(通过快速浏览源代码)也以一种懒惰的方式实现——它会将所有文件读入内存吗?
  • 我相信它确实读入了内存,因为我在使用 scala.Source.fromFile() 然后 getLines() 时得到了 OutOfMemoryException。所以使用 Stream 类听起来像是一个有效的替代方案,对吧?
  • 我强烈建议你使用一个维护良好的 RFC 驱动的原生 Scala 库,它可以最佳地处理这个问题,kantan.csv:nrinaudo.github.io/kantan.csv

标签: scala csv streaming large-files


【解决方案1】:

2020/08/30 更新:请使用 Scala 库 kantan.csv 来最准确和正确地实现 RFC 4180,它定义了 .csv MIME 类型。 p>

虽然我喜欢创建以下解决方案的学习过程,但请不要使用它,因为我发现它存在许多问题,尤其是在规模上。为了避免下面我的解决方案带来的明显技术债务,选择维护良好的 RFC 驱动的 Scala 原生解决方案应该是您照顾当前和未来客户的方式。


如果您希望逐行处理大文件,同时避免要求将整个文件的内容一次全部加载到内存中,那么您可以使用scala.io.Source 返回的Iterator

我有一个小函数,tryProcessSource,(包含两个子函数)我正好用于这些类型的用例。该函数最多需要四个参数,其中只有第一个是必需的。其他参数提供了合理的默认值。

这是函数简介(完整的函数实现在底部):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

第一个参数file: File 是必需的。它只是任何有效的 java.io.File 实例,它指向一个面向行的文本文件,如 CSV。

第二个参数parseLine: (Int, String) => Option[List[String]] 是可选的。如果提供,它必须是一个期望接收两个输入参数的函数; index: IntunparsedLine: String。然后返回一个Option[List[String]]。该函数可能返回由有效列值组成的Some 包装List[String]。或者它可能会返回一个None,这表明整个流处理过程提前中止。如果未提供此参数,则提供默认值(index, line) => Some(List(line))。此默认值会导致将整行作为单个 String 值返回。

第三个参数filterLine: (Int, List[String]) => Option[Boolean] 是可选的。如果提供,它必须是一个期望接收两个输入参数的函数; index: IntparsedValues: List[String]。然后返回一个Option[Boolean]。该函数可能会返回一个Some 包装的Boolean,指示是否应将这一特定行包含在输出中。或者它可能会返回一个None,这表明整个流处理过程提前中止。如果未提供此参数,则提供默认值(index, values) => Some(true)。此默认值会导致包含所有行。

第四个也是最后一个参数retainValues: (Int, List[String]) => Option[List[String]] 是可选的。如果提供,它必须是一个期望接收两个输入参数的函数; index: IntparsedValues: List[String]。然后返回Option[List[String]]。该函数可能会返回一个Some 包装的List[String],其中包含一些子集和/或现有列值的更改。或者它可能会返回一个None,这表明整个流处理过程提前中止。如果未提供此参数,则提供默认值(index, values) => Some(values)。此默认值会生成由第二个参数 parseLine 解析的值。

考虑一个包含以下内容(4 行)的文件:

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240

以下调用配置文件...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

...导致tryLinesDefaults(文件的未更改内容)的此输出:

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)

以下调用配置文件...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

...导致tryLinesParseOnly 的此输出(每行解析为单独的列值):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)

以下调用配置文件...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

...导致tryLinesIrvingTxNoHeader 的此输出(每一行都解析为单独的列值,没有标题,只有 Irving,Tx 中的两行):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)

这是整个tryProcessSource函数实现:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

虽然这个解决方案相对简洁,但我花了相当长的时间和许多重构过程才最终到达这里。如果您发现任何可以改进的方法,请告诉我。


更新:我刚刚以it's own StackOverflow question 提出了以下问题。现在has an answer fixing the error 如下所述。

我的想法是尝试通过以下新的泛型化函数定义将retainValues 参数更改为transformLine 使其更加通用。但是,我在 IntelliJ 中不断收到突出显示错误“Some [List [String]] 类型的表达式不符合预期的 Option [A] 类型”,并且无法弄清楚如何更改默认值,因此错误消失了。

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

任何有关如何完成这项工作的帮助将不胜感激。

【讨论】:

    【解决方案2】:

    如您所说,只需使用Source.fromFile(...).getLines

    这会返回一个 Iterator,它已经是惰性的(您可以将流用作惰性集合,您希望在其中存储以前检索到的值,以便再次读取它们)

    如果您遇到内存问题,那么问题将在于您在 getLines 之后所做的事情。任何像toList 这样强制严格收集的操作都会导致问题。

    【讨论】:

    • 我猜OutOfMemoryException确实是由之后的操作引起的。谢谢!
    • 当您的业务逻辑需要多次遍历迭代器来计算某些东西时,这可能不太好处理迭代器。您可以使用一次迭代器。似乎处理流会更好。就像这个问题一样:stackoverflow.com/questions/17004455/…
    • 这种方法有错误。它专门破坏包含有效换行符的列值。由于存在很多问题,即使存在 .csv MIME 类型的 RFC,我强烈建议您使用维护良好的 RFC 驱动的本机 Scala 库,它可以最佳地处理此问题,kantan.csv:nrinaudo.github.io/kantan.csv跨度>
    【解决方案3】:

    我希望你不是指 Scala 的 collection.immutable.Stream 和 Stream。这不是你想要的。 Stream 是惰性的,但会做记忆。

    我不知道您打算做什么,但仅逐行读取文件应该可以很好地工作,而无需使用大量内存。

    getLines 应该延迟评估并且不应该崩溃(只要您的文件不超过 2³² 行,afaik)。如果是这样,请在 #scala 上询问或提交错误票(或两者都做)。

    【讨论】:

      猜你喜欢
      • 2014-11-15
      • 2023-02-20
      • 2021-09-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多