【问题标题】:How can I Reuse a queue in another computation ? fs2 stream Scala如何在另一个计算中重用队列? fs2 流 Scala
【发布时间】:2020-10-10 01:10:32
【问题描述】:

我收到 Unit 而不是 Stream[IO, String] 的错误。 我正在尝试在下一个队列中重用队列的结果

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random

class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
  implicit timer: Timer[IO]
) {

  def storeInQueueFirst: Stream[IO, Unit] = {
    Stream(1, 2, 3)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
      .metered(Random.between(1, 20).seconds)
      .through(q1.enqueue)

  }
  def getFromQueueFirst: Stream[IO, Unit] = {
    q1.dequeue
      .evalMap(n => IO.delay(println(s"Pulling from queue Second $n")))

  }
  def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
    s.map { n =>
        n.toString
      }
      .metered(Random.between(1, 20).seconds)
      .through(q2.enqueue)
  }

  def getFromQueueSecond: Stream[IO, Unit] = {
    q2.dequeue
      .evalMap(n => IO.delay(println(s"Pulling from queue second $n")))
  }
}

object Five extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q1 <- Queue.bounded[IO, Int](10)
      q2 <- Queue.bounded[IO, String](10)

      b = new StreamTypeIntToDouble(q1, q2)
      _ <- b.storeInQueueFirst.compile.drain.start
      a <- b.getFromQueueFirst.compile.drain
      _ <- b.storeInQueueSecond(a).compile.drain
      _ <- b.getFromQueueSecond.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}

【问题讨论】:

标签: scala queue code-reuse fs2


【解决方案1】:

尝试更改getFromQueueFirst,使其生成Stream[IO, Int],而不是Stream[IO, Unit]

def getFromQueueFirst: Stream[IO, Int] = {
  q1.dequeue
    evalTap(n => IO.delay(println(s"Pulling from queue Second $n")))
}

然后

val program = for {
  q1 <- Queue.bounded[IO, Int](10)
  q2 <- Queue.bounded[IO, String](10)

  b = new StreamTypeIntToDouble(q1, q2)
  _ <- b.storeInQueueFirst.compile.drain.start
  a <- b.getFromQueueFirst.compile.lastOrError
  _ <- b.storeInQueueSecond(Stream(a)).compile.drain
  _ <- b.getFromQueueSecond.compile.drain
} yield ()

编译。

【讨论】:

  • 为什么当我删除 lastOrError 并把它改为 drain 时它不起作用?你能提供启发吗
  • @MunaAr .drain 返回 IO[Unit],所以 aUnit
  • 不幸的是,即使您的回答,问题仍然存在,我已经打开了一个新问题如果您可以检查一下,那就太好了
猜你喜欢
  • 2021-06-24
  • 2020-09-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-23
  • 1970-01-01
  • 2019-12-25
  • 2018-07-04
相关资源
最近更新 更多