【发布时间】:2018-12-25 20:37:20
【问题描述】:
我有一个流正在读取这样的数据:
id | color | shade | 5
1 | red | light
2 | green | dark
3 | blue | light
4 | grey | light
我需要读取第一行,在这种情况下获取整数 (5),然后计算剩余的行,并找出 (true/false) 计数是否匹配。在这种情况下,5 与 4 不匹配,因此它将是 false。
我现在正在这样做,它在一些数据上工作正常,但开始出错并在大型流(超过 1M 记录)上给我 OOM。这就是我正在做的事情
class FirstLine (totalCount: Int)
class ColorLine (id: Int, name: String, shade: String)
class Everything(firstLine: firstLine, List[ColorLine] colors)
val headerResult: Future[FirstLine] =
myRawStr(ctx)
.take(1)
.via(framing("\n"))
.map(_.utf8String)
.map(_.trim)
.map(s => FirstLineParser(s))
.collect {
case Right(fl) => fl
}
.runWith(Sink.head)
val restResult: Future[immutable.Seq[ColorLine]] =
myRawStr(ctx)
.drop(1)
.via(framing("\n"))
.map(_.utf8String)
.map(_.trim)
.map(s => ColorLineParser(s))
.collect {
case Right(color) => color
}
.runWith(Sink.seq)
def validateAndError(everyThing: Everything): Future[List[MyError]] =
validate(everyThing) match {
case Left(errors: Seq[MyError]) =>
val persisted: Future[ValidatedError] = ctx.asScala.self ? (
(ref: ActorRef[ValidatedError]) =>
PersistError(someId,Some(ref)))
persisted.map(_ => errors)
case Right(_) =>
Future.successful(Nil)
}
for {
header <- headerResult
rest <- restResult
res <- validateAndError(Everything(header, rest)
} yield res
问题
有没有办法提高上述代码的效率,使其适用于超过 1M 的记录?
【问题讨论】:
-
您的问题是当您有很多记录时,您无法将它们全部保存在内存中(将所有内容读入
myRawStr本身可能会导致OOM,具体取决于您的文件有多大)。您需要的是一个流式解决方案,它读取输入的一部分,处理它们,然后读取下一个块。这意味着您可以在不断使用内存的情况下处理任意大的输入。一些选项包括monix、fs2 和akka-streams。 -
this blog post 可能会有所帮助。
标签: scala akka akka-stream