【发布时间】:2021-01-12 17:45:20
【问题描述】:
我正在从 REST 端点访问数据:
"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"
为了每秒访问一次数据,我使用了一个无限循环while(true) { 来调用每秒一次发送给参与者的消息,从而开始调用 REST 请求的过程:
访问数据的actor是:
object ProductTickerRestActor {
case class StringData(data: String)
}
class ProductTickerRestActor extends Actor {
override def receive: PartialFunction[Any, Unit] = {
case ProductTickerRestActor.StringData(data) =>
try {
println("in ProductTickerRestActor")
val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
println("rData : "+rData)
}
catch {
case e: Exception =>
println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
}
case msg => println(s"I cannot understand ${msg.toString}")
}
}
我使用以下方式启动应用程序:
object ExchangeModelDataApplication {
def main(args: Array[String]): Unit = {
val actorSystem = ActorSystemConfig.getActorSystem
val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
val throttler = Throttlers.getThrottler(priceDataActor)
while(true) {
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
}
}
节流器:
object Throttlers {
implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)
def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(1, 1.second)
.to(Sink.actorRef(priceDataActor, NotUsed))
.run()
}
如何异步运行以下代码而不是使用无限循环阻塞? :
throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000)
此外,在这种情况下,节流器可能是多余的,因为无论如何我都在限制循环内的请求。
【问题讨论】:
标签: scala akka throttling