【问题标题】:Merge two Stream based on Same ID in Akka Stream在 Akka Stream 中合并两个基于相同 ID 的 Stream
【发布时间】:2018-01-06 03:05:53
【问题描述】:

我有两个输入流。我想合并两个基于相同 ID 的流元素。这是代码详细信息

  implicit val system = ActorSystem("sourceDemo")
  implicit val materializer = ActorMaterializer()

  case class Foo(id: Int, value: String)
  case class Bar(id: Int, value: String)
  case class MergeResult(id: Int, fooValue: String, barValue: String)

  val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
  val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))

我想要得到的结果是MergeResult,它基于FooBar中的相同id。

另外,对于一些 id 不匹配的FooBar,我想保留在内存中,我想知道是否有一种干净的方法可以做到这一点,因为它是有状态的。

更重要的是,源元素是有序的。如果发现 ID 重复,则策略应先匹配先服务。这意味着如果Foo(1, "foo-1"), Foo(1, "foo-2")Bar(1, "Bar-1"),匹配应该是MergeResult(1, "foo-1", "Bar-1")

我目前正在从 akka 流中寻找一些解决方案。如果有一些好的解决方案,如 Spark、Flink 等,那也会有所帮助。

提前致谢。

【问题讨论】:

  • 答案是否相关?
  • 是的。我感觉合理。感谢您的回答

标签: scala akka-stream stream-processing


【解决方案1】:

您准确地描述了一个连接操作。

Akka 流不支持连接操作。您可能会找到一种方法来做到这一点,使用每个流上的窗口和一些演员/状态转换来在它们之间进行查找,但上次我搜索这个时我什么也没找到(不久前),所以你可能处于未知领域.

您只会在更重量级的框架上找到流连接:Flink、Spark Streaming、Kafka 流。原因是 join 从根本上说是一个流与另一个流的查找,这意味着它需要比 Akka 流的设计者想要处理的更复杂的东西(状态管理)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多