2020/08/30 更新:请使用 Scala 库 kantan.csv 来最准确和正确地实现 RFC 4180,它定义了 .csv MIME 类型。 p>
虽然我喜欢创建以下解决方案的学习过程,但请不要使用它,因为我发现它存在许多问题,尤其是在规模上。为了避免下面我的解决方案带来的明显技术债务,选择维护良好的 RFC 驱动的 Scala 原生解决方案应该是您照顾当前和未来客户的方式。
如果您希望逐行处理大文件,同时避免要求将整个文件的内容一次全部加载到内存中,那么您可以使用scala.io.Source 返回的Iterator。
我有一个小函数,tryProcessSource,(包含两个子函数)我正好用于这些类型的用例。该函数最多需要四个参数,其中只有第一个是必需的。其他参数提供了合理的默认值。
这是函数简介(完整的函数实现在底部):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
第一个参数file: File 是必需的。它只是任何有效的 java.io.File 实例,它指向一个面向行的文本文件,如 CSV。
第二个参数parseLine: (Int, String) => Option[List[String]] 是可选的。如果提供,它必须是一个期望接收两个输入参数的函数; index: Int,unparsedLine: String。然后返回一个Option[List[String]]。该函数可能返回由有效列值组成的Some 包装List[String]。或者它可能会返回一个None,这表明整个流处理过程提前中止。如果未提供此参数,则提供默认值(index, line) => Some(List(line))。此默认值会导致将整行作为单个 String 值返回。
第三个参数filterLine: (Int, List[String]) => Option[Boolean] 是可选的。如果提供,它必须是一个期望接收两个输入参数的函数; index: Int,parsedValues: List[String]。然后返回一个Option[Boolean]。该函数可能会返回一个Some 包装的Boolean,指示是否应将这一特定行包含在输出中。或者它可能会返回一个None,这表明整个流处理过程提前中止。如果未提供此参数,则提供默认值(index, values) => Some(true)。此默认值会导致包含所有行。
第四个也是最后一个参数retainValues: (Int, List[String]) => Option[List[String]] 是可选的。如果提供,它必须是一个期望接收两个输入参数的函数; index: Int,parsedValues: List[String]。然后返回Option[List[String]]。该函数可能会返回一个Some 包装的List[String],其中包含一些子集和/或现有列值的更改。或者它可能会返回一个None,这表明整个流处理过程提前中止。如果未提供此参数,则提供默认值(index, values) => Some(values)。此默认值会生成由第二个参数 parseLine 解析的值。
考虑一个包含以下内容(4 行)的文件:
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
以下调用配置文件...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
...导致tryLinesDefaults(文件的未更改内容)的此输出:
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
以下调用配置文件...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
...导致tryLinesParseOnly 的此输出(每行解析为单独的列值):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
以下调用配置文件...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
...导致tryLinesIrvingTxNoHeader 的此输出(每一行都解析为单独的列值,没有标题,只有 Irving,Tx 中的两行):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
这是整个tryProcessSource函数实现:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
虽然这个解决方案相对简洁,但我花了相当长的时间和许多重构过程才最终到达这里。如果您发现任何可以改进的方法,请告诉我。
更新:我刚刚以it's own StackOverflow question 提出了以下问题。现在has an answer fixing the error 如下所述。
我的想法是尝试通过以下新的泛型化函数定义将retainValues 参数更改为transformLine 使其更加通用。但是,我在 IntelliJ 中不断收到突出显示错误“Some [List [String]] 类型的表达式不符合预期的 Option [A] 类型”,并且无法弄清楚如何更改默认值,因此错误消失了。
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
任何有关如何完成这项工作的帮助将不胜感激。