【问题标题】:How can I close the STTP backend after completing my requests?完成请求后如何关闭 STTP 后端?
【发布时间】:2023-04-04 09:12:01
【问题描述】:

我目前正在使用 Monix 后端学习和使用 STTP。在处理完所有请求(每个请求都是一个任务)后,我主要是关闭后端。

我创建了与我的问题相似的示例/模拟代码(据我了解,我的问题更普遍,而不是特定于我的代码):

import sttp.client.asynchttpclient.monix._
import monix.eval.Task
import monix.reactive.Observable
import sttp.client.{Response, UriContext}

import scala.concurrent.duration.DurationInt

object ObservableTest extends App {

  val activities = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val ids: Task[List[Int]] = Task { (1 to 3).toList }
    val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
    val data: Task[List[Task[Response[Either[String, String]]]]] = ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    data.guarantee(backend.close()) // If I close the backend here, I can' generate requests after (when processing the actual requests in the list)
    // I have attempted to return a Task containing a tuple of (data, backend) but closing the backend from outside of the scope did not work as I expected
  }
  import monix.execution.Scheduler.Implicits.global
  val obs = Observable
    .fromTask(activities)
    .flatMap { listOfFetches =>
      Observable.fromIterable(listOfFetches)
    }
    .throttle(3 second, 1)
    .map(_.runToFuture)

  obs.subscribe()
}

我的 fetch (api call maker) 函数看起来像:

  def fetch(uri: Uri, auth: String)(implicit
      backend: SttpBackend[Task, Observable[ByteBuffer], WebSocketHandler]
  ) = {
    println(uri)
    val task = basicRequest
      .get(uri)
      .header("accept", "application/json")
      .header("Authorization", auth)
      .response(asString)
      .send()

    task
  }

由于我的主要任务包含其他任务,我稍后需要处理这些任务,我需要找到另一种方法来从外部关闭 Monix 后端。在我消费List[Task[Response[Either[String, String]]]] 中的请求后,有没有一种干净的方法来关闭后端?

【问题讨论】:

    标签: scala monix sttp


    【解决方案1】:

    问题来自这样一个事实,即在打开 sttp 后端的情况下,您正在计算要执行的任务列表 - List[Task[Response[Either[String, String]]]],但您没有运行它们。因此,我们需要在后端关闭之前按顺序运行这些任务。

    这里要做的关键是创建一个任务的单一描述,在后端仍然打开时运行所有这些请求。

    一旦你计算了data(它本身就是一个任务——一个计算的描述——它在运行时会产生一个任务列表——也是计算的描述),我们需要将它转换成一个单一的、非嵌套Task。这可以通过多种方式完成(例如使用简单的排序),但在您的情况下,这将使用Observable

    AsyncHttpClientMonixBackend().flatMap { implicit backend =>
      val ids: Task[List[Int]] = Task { (1 to 3).toList }
      val f: String => Task[Response[Either[String, String]]] = (i: String) => fetch(uri"$i", "")
      val data: Task[List[Task[Response[Either[String, String]]]]] =
        ids map (_ map (_ => f("https://heloooo.free.beeceptor.com/my/api/path")))
    
      val activities = Observable
        .fromTask(data)
        .flatMap { listOfFetches =>
          Observable.fromIterable(listOfFetches)
        }
        .throttle(3 second, 1)
        .mapEval(identity)
        .completedL
    
      activities.guarantee(
        backend.close()
      )
    }
    

    首先注意Observable.fromTask(...)在最外面的flatMap里面,所以是在后端仍然打开的时候创建的。我们创建 observable,限制它等等,然后是关键的事实:一旦我们有了限制流,我们使用 @987654329 评估每个项目(每个项目是一个 Task[...] - 描述如何发送 na http 请求) @。我们得到Either[String, String] 的流,这是请求的结果。

    最后,我们使用.completedL 将流转换为Task(丢弃结果),等待整个流完成。

    这个最后的任务是通过关闭后端来排序的。如上所述,最终会发生的副作用是:

    1. 打开后端
    2. 创建任务列表 (data)
    3. 创建一个流,它限制data 计算的列表中的元素
    4. 评估流中的每个项目(发送请求)
    5. 关闭后端

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-12-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多