【问题标题】:How to read a large stream and compare counts?如何读取大流并比较计数?
【发布时间】:2018-12-25 20:37:20
【问题描述】:

我有一个流正在读取这样的数据:

id | color | shade | 5
1 | red | light
2 | green | dark
3 | blue | light
4 | grey | light

我需要读取第一行,在这种情况下获取整数 (5),然后计算剩余的行,并找出 (true/false) 计数是否匹配。在这种情况下,5 与 4 不匹配,因此它将是 false

我现在正在这样做,它在一些数据上工作正常,但开始出错并在大型流(超过 1M 记录)上给我 OOM。这就是我正在做的事情

class FirstLine (totalCount: Int)
class ColorLine (id: Int, name: String, shade: String)
class Everything(firstLine: firstLine, List[ColorLine] colors)

    val headerResult: Future[FirstLine] =
      myRawStr(ctx)
        .take(1)
        .via(framing("\n"))
        .map(_.utf8String)
        .map(_.trim)
        .map(s => FirstLineParser(s))
        .collect {
          case Right(fl) => fl
        }
        .runWith(Sink.head)

    val restResult: Future[immutable.Seq[ColorLine]] =
      myRawStr(ctx)
        .drop(1)
        .via(framing("\n"))
        .map(_.utf8String)
        .map(_.trim)
        .map(s => ColorLineParser(s))
        .collect {
          case Right(color) => color
        }
        .runWith(Sink.seq)

    def validateAndError(everyThing: Everything): Future[List[MyError]] =
      validate(everyThing) match {
        case Left(errors: Seq[MyError]) =>
          val persisted: Future[ValidatedError] = ctx.asScala.self ? (
              (ref: ActorRef[ValidatedError]) =>
                PersistError(someId,Some(ref)))
          persisted.map(_ => errors)

        case Right(_) =>
          Future.successful(Nil)
      }

    for {
      header <- headerResult
      rest <- restResult
      res <- validateAndError(Everything(header, rest)
    } yield res

问题

有没有办法提高上述代码的效率,使其适用于超过 1M 的记录?

【问题讨论】:

  • 您的问题是当您有很多记录时,您无法将它们全部保存在内存中(将所有内容读入myRawStr 本身可能会导致OOM,具体取决于您的文件有多大)。您需要的是一个流式解决方案,它读取输入的一部分,处理它们,然后读取下一个块。这意味着您可以在不断使用内存的情况下处理任意大的输入。一些选项包括monixfs2akka-streams
  • this blog post 可能会有所帮助。

标签: scala akka akka-stream


【解决方案1】:

最有效的方法是不将restResult 收集为Seq[ColorLine],而是只生成颜色线计数作为结果:

type Count = Long

val zeroCount : Count = 0L

val countColorLine : (Count, ColorLine) => Count = 
  (count, _) => count + 1

val restResultCount: Future[Count] =
  myRawStr(ctx)
    .drop(1)
    .via(framing("\n"))
    .map(_.utf8String)
    .map(_.trim)
    .map(s => ColorLineParser(s))
    .collect {
      case Right(color) => color
    }
    .runFold(zeroCount)(countColorLine)  

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多