【问题标题】:Grouping a stream of elements into multiple streams将元素流分组为多个流
【发布时间】:2020-11-08 11:44:27
【问题描述】:

假设我们有一个case class MyCaseClass(name: String, value: Int)。给定fs2.Stream[F, MyCaseClass],我想将具有相同name 的元素分组

val sourceStream: fs2.Stream[F, MyCaseClass] = //
val groupedSameNameStream: fs2.Stream[F, fs2.Stream[F, MyCaseClass]] = //

我需要这样做的原因是我想应用有效的转换

val transform: MyCaseClass => F[Unit] = //

对于流的所有元素,如果一组失败,另一组应该继续工作。

这样的事情可以做吗?

【问题讨论】:

  • 你说“继续工作”是什么意思?你能否举个例子,你想如何将此转换应用于groupedSameNameStream
  • 看起来你想要对“生活”流进行某种聚合,但它并不像乍一看那样简单。例如,Spark 正在使用水印做很多魔法以使其正常工作。我不认为 fs2 可以处理如此复杂的情况。
  • 经过一番调查,我发现它可以使用groupWithin 处理类似的事情,但你不会得到流组。会有groups of elements received within a time window, or limited by the number of the elements
  • @SomeName 我认为一般来说不可能(不是因为 fs2 限制)。假设在N 时,您的源流收到了单词aabbaa。你想按这些词分组。在这种情况下,在N 时间,您分组的流将有两个组(每个aa 一个,每个bb 一个)可供任何消费者使用。但是,如果在消费组aa 之后,源流中还会有另一个aa 字。它应该是一个只有一个元素的新组吗?还是应该忽略它,因为aa 的组已经被消费了。

标签: scala functional-programming scala-cats fs2


【解决方案1】:

这是可能的,但需要注意。

如果您接受 Map 具有无限数量的键,并且每个关联的 Queues 数量无限,那么这样做相对很简单。

我们在生产环境中使用了基于 gist by github user kiambogo 的代码(尽管我们的代码已经过调整),并且运行良好:

import fs2.concurrent.Queue
import cats.implicits._
import cats.effect.Concurrent
import cats.effect.concurrent.Ref
 
def groupBy[F[_], A, K](selector: A => F[K])(implicit F: Concurrent[F]): Pipe[F, A, (K, Stream[F, A])] = {
  in =>
  Stream.eval(Ref.of[F, Map[K, Queue[F, Option[A]]]](Map.empty)).flatMap { st =>
    val cleanup = {
      import alleycats.std.all._
      st.get.flatMap(_.traverse_(_.enqueue1(None)))
    }

    (in ++ Stream.eval_(cleanup))
      .evalMap { el =>
        (selector(el), st.get).mapN { (key, queues) =>
          queues.get(key).fold {
            for {
              newQ <- Queue.unbounded[F, Option[A]] // Create a new queue
              _ <- st.modify(x => (x + (key -> newQ), x)) // Update the ref of queues
              _ <- newQ.enqueue1(el.some)
            } yield (key -> newQ.dequeue.unNoneTerminate).some
          }(_.enqueue1(el.some) as None)
        }.flatten
      }.unNone.onFinalize(cleanup)
  }
}

如果我们假设每个 Map 条目的开销为 64 字节(我认为这非常高估了),那么 100,000 个唯一键的基数给我们大约 6.1MiB - 好吧 在 jvm 进程的合理大小范围内。

【讨论】:

  • 很好奇为什么选择器需要在 F 中返回?为什么是A =&gt; F[K] 而不是A =&gt; K
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-30
  • 2019-03-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多