【发布时间】:2021-08-30 16:05:39
【问题描述】:
我想通过 HTTP 进行流式 JSON 响应。目的是每秒发送给定城市的当前时间。
TL;DR:我需要帮助发送一个效果 F 的结果,并且应该每 1 秒调用一个返回此效果的函数。有没有你知道的简单方法?
我已经尝试了几种方法,但都不起作用。
- 这是第一个:
def timeStreamingRoutes[F[_]: Sync : Timer](times: Times[F]): HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
import dsl._
HttpRoutes.of[F] {
case GET -> Root / "streaming" / city =>
val throttling = Stream.awakeEvery[F](1.second)
for {
timeOrErr <- times.get(city.toUpperCase)
resp <- timeOrErr match {
case Right(time) => Ok(throttling.map(_ => time.asJson))
case Left(error) => BadRequest(throttling.map(_ => error.asJson))
}
} yield resp
}
}
这里,我的times.get(city.toUpperCase) 函数具有以下签名:
def get(city: String): F[Either[Times.CurrentTimeError, Times.CurrentTime]]
,其中CurrentTimeError 和CurrentTime 是我的自定义案例类。问题是我在timeOrErr <- times.get(city.toUpperCase) 行中只有一次时间。因此,它每秒发送完全相同的值(例如,2:43:31、2:43:31 等。我希望它是 2:43:31、2:43:32 等)。而且我不知道如何让这个函数每秒被调用一次。
- 另外,我尝试使用一种稍微不同的技术(以及许多其他与此类似的技术):
def timeStreamingRoutes[F[_]: Sync : Timer](times: Times[F]): HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
import dsl._
HttpRoutes.of[F] {
case GET -> Root / "streaming" / city =>
val throttling = Stream.awakeEvery[F](1.second)
val payload = Stream(
for {
timeOrErr <- times.get(city.toUpperCase)
resp <- timeOrErr match {
case Right(time) => Ok(time.asJson)
case Left(error) => BadRequest(error.asJson)
}
} yield resp
)
val stream = throttling.zipRight(payload)
Ok(stream)
}
}
这里的问题是嵌套单子的地狱。 stream 具有 Stream[F, F[Response[F]]] 类型。而且我不能使它成为正确的F[Response[F]],因为fs2 Stream 不提供sequence 或traverse 之类的功能。如果我尝试返回Ok(stream),那么Circe 无法序列化F,因为它是抽象的,所以它甚至没有被编译。
- 第三种方法是:
HttpRoutes.of[F] {
case GET -> Root / "streaming" / city =>
val throttling = Stream.awakeEvery[F](1.second)
val payload = Stream(
for {
timeOrErr <- times.get(city.toUpperCase)
resp <- timeOrErr match {
case Right(time) => time.asJson
case Left(error) => error.asJson
}
} yield resp
)
val stream = throttling.map(_ => payload)
Ok(stream)
}
嗯,数字 3 也没有编译。主要是因为我不能在payload 中编写单子。也就是说,case Right(time) => time.asJson 和 case Left(error) => error.asJson 必须类似于 case Right(time) => SomethingThatcanBeUsedAsALastWrapperInThisForComprehension(time.asJson)。
不幸的是,官方docs 对此知之甚少。我很高兴听到任何建议!
【问题讨论】:
-
我对 fs2 不熟悉,但我希望您可以使用
mapAsync方法,例如throttling.mapAsync(times.get(city.toUpperCase))。这只是全局的想法,显然对于泛型效果类型,这并不容易。 -
这里的关键是对流的每个元素运行效果,而不是运行一次效果,对流的每个元素使用它的值。
-
@GaëlJ 感谢您的回复!我会尝试调查
mapAsync。顺便说一句,你认为我应该用 Cats 中的具体IO替换抽象F吗?因为F是http4s 自动骨架的剩余部分。我很困惑是否应该在此函数中将其替换为具体的效果类型。 -
如果你不是在构建某种库,只需设置一个具体类型是的!
标签: scala scala-cats cats-effect http4s fs2