【发布时间】:2020-05-10 05:13:59
【问题描述】:
我有一个返回 fs2.Stream 的函数。
import cats.effect._
import fs2._
def apply(sds: SerialPort, interval: Int)(implicit cs: ContextShift[IO]): Stream[IO, SdsMeasurement] =
for {
blocker <- Stream.resource(Blocker[IO])
stream <- io.readInputStream(IO(sds.getInputStream), 1, blocker)
.through(SdsStateMachine.collectMeasurements())
} yield stream
通常它是一个无限流,除非我向它传递一个测试标志,在这种情况下它应该输出一个值并停止。
val infiniteSource: Stream[IO, SdsMeasurement] = ...
val source = if (isTest) infiniteSource.take(1) else infiniteSource
source.compile.drain
无限流工作正常。它无限地给了我所有的测量值。测试流确实只给了我第一次测量,仅此而已。我遇到的问题是 Stream 在最后一次测量之后没有返回。它永远阻塞。我做错了什么?
注意:我认为我抽象了基本代码,但要了解更多上下文,请查看我的项目:https://github.com/jkransen/fijnstof/blob/ZIO/src/main/scala/nl/kransen/fijnstof/Main.scala
【问题讨论】:
标签: scala fs2 cats-effect