【问题标题】:fs2 Concurrent queue Scala : Lack of insightsfs2并发队列Scala:缺乏洞察力
【发布时间】:2020-09-30 23:51:02
【问题描述】:

我是一个新手,试图掌握 fs2 队列背后的直觉。 我正在尝试做一个从Stream[IO, Int] 提取数据的基本示例。但对我来说,文档还不够,因为它直接深入到高级内容。

这是我到目前为止所做的:

import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue

class QueueInt(q: Queue[IO, Int]) {
  def startPushingtoQueue: Stream[IO, Unit] = {
    Stream(1, 2, 3).covary[IO].through(q.enqueue)
    q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
  }
}

object testingQueues extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {

    val stream = for {
      q <- Queue.bounded(10)
      b = new QueueInt(q)
      _ <- b.startPushingtoQueue.drain
    } yield ()
  }

}

问题 1:我收到了No implicit argument of type Concurrent[F_], 知道我没有使用任何并发效果,我似乎无法弄清楚我错过了什么?

问题 2:如何打印结果。

问题 3:谁能指导我一些资源来学习 fs2

【问题讨论】:

    标签: scala stream queue fs2


    【解决方案1】:

    我在您的代码中发现了几个问题:

    1. 如果您收到有关缺少隐式的错误,您通常可以通过显式声明类型参数来修复它们:
     q <- Queue.bounded[IO, Unit](10) // it will fix your error with implicits
    
    1. 你的 for comprehension 的结果类型是 IO[Unit],但为了使其运行,你必须从 run 方法返回它。您还需要将类型从单位更改为ExitCode
    stream.as(ExitCode.Success)
    
    1. 在您的方法startPushingToQueue 中,您正在创建Steam,但您没有在任何地方分配它。它只会创建流的描述,但不会运行。

    我认为您想要实现的是创建将元素推送到队列的方法和另一个从队列中获取元素并打印它们的方法。请检查我的解决方案:

    import cats.effect.{ ExitCode, IO, IOApp}
    import fs2._
    import fs2.concurrent.Queue
    import scala.concurrent.duration._
    
    class QueueInt(q: Queue[IO, Int])(implicit timer: Timer[IO]) { //I need implicit timer for metered
      def startPushingToQueue: Stream[IO, Unit] = Stream(1, 2, 3)
        .covary[IO]
        .evalTap(n => IO.delay(println(s"Pushing element $n to Queue"))) //eval tap evaluates effect on an element but doesn't change stream
        .metered(500.millis) //it will create 0.5 delay between enqueueing elements of stream,
         // I added it to make visible that elements can be pushed and pulled from queue concurrently
        .through(q.enqueue)
    
      def pullAndPrintElements: Stream[IO, Unit] = q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
    }
    
    object testingQueues extends IOApp {
      override def run(args: List[String]): IO[ExitCode] = {
    
        val program = for {
          q <- Queue.bounded[IO, Int](10)
          b = new QueueInt(q)
          _ <- b.startPushingToQueue.compile.drain.start //start at the end will start running stream in another Fiber
          _ <- b.pullAndPrintElements.compile.drain //compile.draing compiles stream into io byt pulling all elements.
        } yield ()
    
        program.as(ExitCode.Success)
      }
    
    }
    

    在控制台上,您将看到有关从队列中推入和拉出交错的行。 如果您删除 start,您将看到来自 startPushingToQueue 的第一个流在推送所有元素后完成,然后才开始 pullAndPrintElements

    如果您正在寻找学习 fs2 的好资源,我建议您首先查看与 fs2 相关的讲座。比旧的更喜欢新的谈话,因为他们可以引用旧的 API。

    您还应该查看 fs2 文档中的guide

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-08
      • 1970-01-01
      • 2018-05-12
      • 1970-01-01
      • 2021-09-12
      • 2020-10-08
      相关资源
      最近更新 更多