【问题标题】:Immutable stream batching不可变流批处理
【发布时间】:2019-10-22 13:22:24
【问题描述】:

this 问题中的解决方案是否有不可变的替代方案,它在流中批量处理数据:

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

生产

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

【问题讨论】:

  • 可能使用折叠
  • @cchantep 对,大概也是子流,因为 fold 只在流的末尾发出

标签: scala akka akka-stream


【解决方案1】:

您的方式很好,不变性和无状态是并发分布式软件编程的一个重要方面。 我在使用来自akka 的 groupBy 时将这个scastie 与您的示例代码分享。 让我知道,如果它会帮助你。 输出是这样的

(3,List(!))
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))

【讨论】:

    猜你喜欢
    • 2017-02-01
    • 2018-01-11
    • 1970-01-01
    • 1970-01-01
    • 2016-06-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-12-07
    相关资源
    最近更新 更多