【问题标题】:Stream Future in Play 2.5在 Play 2.5 中流式传输未来
【发布时间】:2017-10-31 04:01:47
【问题描述】:

我再次尝试更新一些 Play 2.5 之前的代码(基于此 vid)。例如以下曾经是如何流式传输 Future

  Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500)))

我使用AkkaPromise.timeout(已弃用)创建了以下解决方法:

  private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = {
    val promise: Promise[Result] = Promise[Result]()
    actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) }
    promise.future
  }

根据Play Framework Migration Guide; Enumerators 应该重写为源代码,而Source.unfoldAsync 显然等同于Enumerator.generateM,所以我希望这会起作用(其中strFuture[String]):

  def inf = Action { request =>
    val str = keepResponding("stream me", 1.second, 2.second)

    Ok.chunked(Source.unfoldAsync(str))
  }

当然,在查看 unfoldAsync 的案例类签名时,我遇到了 类型不匹配 错误:

final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]])

我可以看到参数不正确,但我不完全理解我应该通过什么/如何传递它。

【问题讨论】:

    标签: scala playframework streaming akka akka-stream


    【解决方案1】:

    unfoldAsync 比 Play! 自己的 generateM 更通用,因为它允许您传递状态 (S) 值。这可以使发出的值取决于先前发出的值。

    下面的例子将通过增加 id 来加载值,直到加载失败:

    val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒
      loadFromId(id)
        .map(s ⇒ Some((id + 1, s)))
        .recover{case _ ⇒ None}
    }
    
    def loadFromId(id: Int): Future[String] = ???
    

    在您的情况下,实际上并不需要内部状态,因此您可以在需要时传递虚拟值,例如

    val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒
      schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x))
    }
    
    def schedule(data: String, delay: FiniteDuration): Future[Result] = {
      akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))}
    }
    

    请注意,您对keepResponding 的原始实现不正确,因为您不能多次完成Promise。 Akka after 模式提供了一种更简单的方法来实现您所需要的。

    但是,请注意,在您的具体情况下,Akka Streams 提供了一个更惯用的解决方案,Source.tick

    val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒
      loadSomeFuture()
    }
    
    def loadSomeFuture(): Future[String] = ???
    

    如果您实际上不需要像示例中那样的异步计算,甚至更简单

    val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me")
    

    【讨论】:

    • 谢谢。您的第 3 和第 4 个示例有效,但这两个示例都需要至少几分钟才能输出第一个结果,这感觉不应该发生。您能想到的任何加快速度的方法吗?同样由于某种原因,我无法让第一个示例正常工作 - 前向引用的错误超出了值源的定义。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-18
    • 1970-01-01
    相关资源
    最近更新 更多