【问题标题】:How to continually call a REST service using non blocking code with Akka如何使用 Akka 的非阻塞代码持续调用 REST 服务
【发布时间】:2021-01-12 17:45:20
【问题描述】:

我正在从 REST 端点访问数据:

"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"

为了每秒访问一次数据,我使用了一个无限循环while(true) { 来调用每秒一次发送给参与者的消息,从而开始调用 REST 请求的过程:

访问数据的actor是:

object ProductTickerRestActor {

  case class StringData(data: String)

}

class ProductTickerRestActor extends Actor {
  
  override def receive: PartialFunction[Any, Unit] = {

    case ProductTickerRestActor.StringData(data) =>
      try {
        println("in ProductTickerRestActor")
        val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
        println("rData : "+rData)

      }
      catch {
        case e: Exception =>
          println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
      }

    case msg => println(s"I cannot understand ${msg.toString}")
  }
}

我使用以下方式启动应用程序:

object ExchangeModelDataApplication {

  def main(args: Array[String]): Unit = {

    val actorSystem = ActorSystemConfig.getActorSystem

    val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
    val throttler = Throttlers.getThrottler(priceDataActor)
    while(true) {
      throttler ! ProductTickerRestActor.StringData("test")
      Thread.sleep(1000)
    }

}

节流器:

object Throttlers {


  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 1.second)
    .to(Sink.actorRef(priceDataActor, NotUsed))
    .run()
}

如何异步运行以下代码而不是使用无限循环阻塞? :

throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000) 

此外,在这种情况下,节流器可能是多余的,因为无论如何我都在限制循环内的请求。

【问题讨论】:

    标签: scala akka throttling


    【解决方案1】:

    我只会使用 Akka Streams 和 Akka HTTP。使用 Akka 2.6.x,类似的东西足以满足 1 个请求/秒

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import akka.stream.scaladsl._
    
    import scala.concurrent.duration._
    
    object HTTPRepeatedly {
      implicit val system = ActorSystem()
      import system.dispatcher
    
      val sourceFromHttp: Source[String, NotUsed] =
        Source.repeated("test") // Not sure what "test" is actually used for here...
          .throttle(1, 1.second)
          .map { str =>
            HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
          }.mapAsync(1) { req =>
            Http().singleRequest(req)
          }.mapAsync(1)(_.entity.toStrict(1.minute))
          .map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
    }
    

    然后您可以,例如(为简单起见,将其放在 main 内的 HTTPRepeatedly 中,以便隐含在范围内等)

    val done: Future[Done] =
      sourceFromHttp
        .take(10) // stop after 10 requests
        .runWith(Sink.foreach { rData => println(s"rData: $rData") })
    
    scala.concurrent.Await.result(done, 11.minute)
    
    system.terminate()
    

    【讨论】:

      【解决方案2】:

      每秒发送一个请求并不是一个好主意。如果由于某种原因请求被延迟,您将收到大量请求堆积。而是在前一个请求完成后一秒发送下一个请求。

      因为这段代码使用了一个同步的GET请求,你可以在mkString返回后一秒发送下一个请求。

      但是使用同步请求并不是在 Akka 中使用 RESTful API 的好方法。它会阻塞actor receive 方法直到请求完成,这最终会阻塞整个ActorSystem

      改为使用 Akka Http 和 singleRequest 来执行异步请求。

      Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))
      

      这会返回一个Future。在请求完成后一秒钟发出新请求(例如,在 Future 上使用 onComplete)。

      这不仅更安全、更异步,而且与fromUrl相比,它还提供了对 REST API 调用的更多控制

      【讨论】:

        猜你喜欢
        • 2015-11-25
        • 2016-06-25
        • 2022-12-09
        • 1970-01-01
        • 2021-01-17
        • 1970-01-01
        • 2021-12-13
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多