【问题标题】:Schedule computation concurrently for all elements of the fs2.Stream为 fs2.Stream 的所有元素同时安排计算
【发布时间】:2020-11-04 02:09:14
【问题描述】:

我有一个 fs2.Stream 包含一些元素(可能是无限的),我想为流的所有元素同时安排一些计算。这是我尝试过的

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)

val stream = for {
  id <- fs2.Stream.emits(List(1, 2)).covary[IO]
  _ <- fs2.Stream.awakeEvery[IO](1.second)
  _ <- fs2.Stream.eval(IO(println(id)))
} yield ()

stream.compile.drain.unsafeRunSync()

程序输出的样子

1
1
1
etc...

这不是预期的。我想为原始流的所有元素交错安排计算,但不要等到第一个流终止(由于无限调度而永远不会发生)。

【问题讨论】:

  • 你的意思是你想让第一个流的元素以 1 秒的间隔按顺序出现吗?还是要 1 和 2 一次出现,然后以 1 秒的间隔重复?
  • flatMap 是顺序的,但您想要并行处理。您可以使用zip 来获得它。
  • @KrzysztofAtłasik 我希望对原始流的所有元素每 1 秒并行运行一次计算
  • 如果您提供预期的结果会更容易,因为很难理解您想要实现什么。您可以将id &lt;- fs2.Stream.emits(List(1, 2)).covary[IO]_ &lt;- fs2.Stream.awakeEvery[IO](1.second) 切换并检查它是否符合您的预期吗?
  • 你可以例如做fs2.Stream.emits(List(1, 2)).covary[IO].delayBy(1.second).repeat。但正如我所说,很难说出你的意图是什么。我认为你会通过编辑你的问题并提供更好的预期结果而受益。

标签: scala functional-programming scala-cats fs2


【解决方案1】:

根据@KrzysztofAtłasik 和@LuisMiguelMejíaSuárez 给出的提示,这是我刚刚提出的解决方案:

val originalStream = fs2.Stream.emits(List(1, 2))

val scheduledComputation = originalStream.covary[IO].map({ id =>
        fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

@KrzysztofAtłasik 在评论中提出的带有交错的解决方案 id &lt;- fs2.Stream.emits(List(1, 2)).covary[IO]_ &lt;- fs2.Stream.awakeEvery[IO](1.second) 也可以,但它不允许以自己的方式安排每个元素。

要在elementValue 秒内同时安排元素,可以执行以下操作:

val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
                                 //id.seconds
        fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

【讨论】:

    【解决方案2】:
    val str = for {
      id <- Stream.emits(List(1, 5, 7)).covary[IO]
      res = timer.sleep(id.second) >> IO(println(id))
    } yield res
    
    val stream =  str.parEvalMapUnordered(5)(identity)
    
    stream.compile.drain.unsafeRunSync()
    

     val stream = Stream.emits(List(1, 5, 7))
       .map { id => 
         Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
       .parJoinUnbounded
    
    stream.compile.drain.unsafeRunSync()
    

    【讨论】:

      猜你喜欢
      • 2015-08-07
      • 1970-01-01
      • 2020-10-08
      • 2020-08-09
      • 2021-10-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-09-09
      相关资源
      最近更新 更多