【发布时间】:2020-11-08 11:44:27
【问题描述】:
假设我们有一个case class MyCaseClass(name: String, value: Int)。给定fs2.Stream[F, MyCaseClass],我想将具有相同name 的元素分组
val sourceStream: fs2.Stream[F, MyCaseClass] = //
val groupedSameNameStream: fs2.Stream[F, fs2.Stream[F, MyCaseClass]] = //
我需要这样做的原因是我想应用有效的转换
val transform: MyCaseClass => F[Unit] = //
对于流的所有元素,如果一组失败,另一组应该继续工作。
这样的事情可以做吗?
【问题讨论】:
-
你说“继续工作”是什么意思?你能否举个例子,你想如何将此转换应用于
groupedSameNameStream? -
看起来你想要对“生活”流进行某种聚合,但它并不像乍一看那样简单。例如,Spark 正在使用水印做很多魔法以使其正常工作。我不认为 fs2 可以处理如此复杂的情况。
-
经过一番调查,我发现它可以使用
groupWithin处理类似的事情,但你不会得到流组。会有groups of elements received within a time window, or limited by the number of the elements。 -
@SomeName 我认为一般来说不可能(不是因为 fs2 限制)。假设在
N时,您的源流收到了单词aa、bb、aa。你想按这些词分组。在这种情况下,在N时间,您分组的流将有两个组(每个aa一个,每个bb一个)可供任何消费者使用。但是,如果在消费组aa之后,源流中还会有另一个aa字。它应该是一个只有一个元素的新组吗?还是应该忽略它,因为aa的组已经被消费了。
标签: scala functional-programming scala-cats fs2