【发布时间】:2020-11-04 02:09:14
【问题描述】:
我有一个 fs2.Stream 包含一些元素(可能是无限的),我想为流的所有元素同时安排一些计算。这是我尝试过的
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val stream = for {
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
_ <- fs2.Stream.awakeEvery[IO](1.second)
_ <- fs2.Stream.eval(IO(println(id)))
} yield ()
stream.compile.drain.unsafeRunSync()
程序输出的样子
1
1
1
etc...
这不是预期的。我想为原始流的所有元素交错安排计算,但不要等到第一个流终止(由于无限调度而永远不会发生)。
【问题讨论】:
-
你的意思是你想让第一个流的元素以 1 秒的间隔按顺序出现吗?还是要 1 和 2 一次出现,然后以 1 秒的间隔重复?
-
flatMap是顺序的,但您想要并行处理。您可以使用zip来获得它。 -
@KrzysztofAtłasik 我希望对原始流的所有元素每 1 秒并行运行一次计算
-
如果您提供预期的结果会更容易,因为很难理解您想要实现什么。您可以将
id <- fs2.Stream.emits(List(1, 2)).covary[IO]与_ <- fs2.Stream.awakeEvery[IO](1.second)切换并检查它是否符合您的预期吗? -
你可以例如做
fs2.Stream.emits(List(1, 2)).covary[IO].delayBy(1.second).repeat。但正如我所说,很难说出你的意图是什么。我认为你会通过编辑你的问题并提供更好的预期结果而受益。
标签: scala functional-programming scala-cats fs2