【发布时间】:2023-04-04 09:12:01
【问题描述】:
我目前正在使用 Monix 后端学习和使用 STTP。在处理完所有请求(每个请求都是一个任务)后,我主要是关闭后端。
我创建了与我的问题相似的示例/模拟代码(据我了解,我的问题更普遍,而不是特定于我的代码):
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}
import scala.concurrent.duration.DurationInt
object ObservableTest extends App {
val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val ids: Task[List[Int]] = Task { (1 to 3).toList }
val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
// I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
}
import monix.execution.Scheduler.Implicits.global
val obs = Observable
.fromTask(activities)
.flatMap { listOfFetches =>
Observable.fromIterable(listOfFetches)
}
.throttle(3 second, 1)
.map(_.runToFuture)
obs.subscribe()
}
我的 fetch (api call maker) 函数看起来像:
def fetch(uri: Uri, auth: String)(implicit
backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
) = {
println(uri)
val task = basicRequest
.get(uri)
.header("accept", "application/json")
.header("Authorization", auth)
.response(asString)
.send()
task
}
由于我的主要任务包含其他任务,我稍后需要处理这些任务,我需要找到另一种方法来从外部关闭 Monix 后端。在我消费List[Task[Response[Either[String, String]]]] 中的请求后,有没有一种干净的方法来关闭后端?
【问题讨论】: