【发布时间】:2018-01-06 03:05:53
【问题描述】:
我有两个输入流。我想合并两个基于相同 ID 的流元素。这是代码详细信息
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
case class Foo(id: Int, value: String)
case class Bar(id: Int, value: String)
case class MergeResult(id: Int, fooValue: String, barValue: String)
val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))
我想要得到的结果是MergeResult,它基于Foo和Bar中的相同id。
另外,对于一些 id 不匹配的Foo 和Bar,我想保留在内存中,我想知道是否有一种干净的方法可以做到这一点,因为它是有状态的。
更重要的是,源元素是有序的。如果发现 ID 重复,则策略应先匹配先服务。这意味着如果Foo(1, "foo-1"), Foo(1, "foo-2") 和Bar(1, "Bar-1"),匹配应该是MergeResult(1, "foo-1", "Bar-1")。
我目前正在从 akka 流中寻找一些解决方案。如果有一些好的解决方案,如 Spark、Flink 等,那也会有所帮助。
提前致谢。
【问题讨论】:
-
答案是否相关?
-
是的。我感觉合理。感谢您的回答
标签: scala akka-stream stream-processing